Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .claude/CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ golangci-lint run --timeout=10m --build-tags=safe
- Creates Temporal client with interceptors
- Starts Temporal workers

2. **Reset Flow** (`plugin.go:Reset()`, `ResetAP()`):
2. **Reset Flow** (`plugin.go:Reset()`):
- Triggered by worker stop events
- Stops Temporal workers, resets pools
- Purges sticky workflow cache
Expand All @@ -94,7 +94,7 @@ golangci-lint run --timeout=10m --build-tags=safe
3. **Event Handling**:
- Subscribes to `EventWorkerStopped` events
- Checks PID in event message to determine WF vs Activity worker
- Executes full reset for WF worker, activity-only reset for activity workers
- Workflow-worker death triggers a full reset; an activity-worker death needs no reset (the pool's watcher already replaces the single dead worker)

### Configuration

Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ require (
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
github.com/prometheus/client_golang v1.23.2
github.com/prometheus/client_model v0.6.2 // indirect
github.com/prometheus/common v0.68.0 // indirect
github.com/prometheus/common v0.68.1 // indirect
github.com/prometheus/procfs v0.20.1 // indirect
github.com/roadrunner-server/goridge/v3 v3.8.3
github.com/robfig/cron v1.2.0 // indirect
Expand Down
9 changes: 7 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -165,8 +165,8 @@ github.com/prometheus/client_model v0.6.2/go.mod h1:y3m2F6Gdpfy6Ut/GBsUqTWZqCUvM
github.com/prometheus/common v0.4.1/go.mod h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y86RQel1bk4=
github.com/prometheus/common v0.10.0/go.mod h1:Tlit/dnDKsSWFlCLTWaA1cyBgKHSMdTB80sz/V91rCo=
github.com/prometheus/common v0.26.0/go.mod h1:M7rCNAaPfAosfx8veZJCuw84e35h3Cfd9VFqTh1DIvc=
github.com/prometheus/common v0.68.0 h1:8rQJvQmYltsR2L7h8Zw0Iyj8WYNNmpwikoQTZXwfVeA=
github.com/prometheus/common v0.68.0/go.mod h1:4soH+U8yJSROk7OJ//hmTiWKsxapv6zRGgTt3keN8gQ=
github.com/prometheus/common v0.68.1 h1:omjRRl4QP4komogpXuhfeOiisQg7xdy8VM1UY+pStaY=
github.com/prometheus/common v0.68.1/go.mod h1:ZzL3f6u94qUxh9p+tJTrF+FvBS1XXbbRAZCQkytAL0Y=
github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk=
github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsTZCD3I8kEA=
github.com/prometheus/procfs v0.1.3/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4OA4YeYWdaU=
Expand Down Expand Up @@ -230,10 +230,15 @@ github.com/yusufpapurcu/wmi v1.2.4/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQ
go.opentelemetry.io/auto/sdk v1.2.1 h1:jXsnJ4Lmnqd11kwkBV2LgLoFMZKizbCi5fNZ/ipaZ64=
go.opentelemetry.io/auto/sdk v1.2.1/go.mod h1:KRTj+aOaElaLi+wW1kO/DZRXwkF4C5xPbEe3ZiIhN7Y=
go.opentelemetry.io/otel v1.44.0 h1:JjwHmHpA4iZ3wBxluu2fbbE7j4kqlE8jXyAyPXH7HqU=
go.opentelemetry.io/otel v1.44.0/go.mod h1:BMgjTHL9WPRlRjL2oZCBTL4whCGtXch2H4BhOPIAyYc=
go.opentelemetry.io/otel/metric v1.44.0 h1:1w0gILTcHdr3YI+ixLyjemwrVnsMURbTZFrSYCdDdmc=
go.opentelemetry.io/otel/metric v1.44.0/go.mod h1:8O7hanEPBNgEMmybD3s2VBKcgWOCsA6tzHBPODAiquo=
go.opentelemetry.io/otel/sdk v1.44.0 h1:nHYwb9lK+fJPU/dnT6s7W7Z8itMWyqrnVfbheVYrZ58=
go.opentelemetry.io/otel/sdk v1.44.0/go.mod h1:Osuydd3Se74nqjAKxid74N5eC+jfEqfTegHRnq58oK0=
go.opentelemetry.io/otel/sdk/metric v1.44.0 h1:3LlKgI+VjbVsjNRFZJZAJ30WjXC5VkNRks6si09iEfI=
go.opentelemetry.io/otel/sdk/metric v1.44.0/go.mod h1:5B5pMARnXxKhltooO4xUuCBorl65a4EpnTalObqOigA=
go.opentelemetry.io/otel/trace v1.44.0 h1:jxF5CsGYCe74MCRx2X4g7WsY/VBKRqqpNvXlX/6gtIk=
go.opentelemetry.io/otel/trace v1.44.0/go.mod h1:oLl1jrMQAVo6v3GAggN+1VH9VIz9iUSvW53sW1Q8PIE=
go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI=
go.temporal.io/api v1.5.0/go.mod h1:BqKxEJJYdxb5dqf0ODfzfMxh8UEQ5L3zKS51FiIYYkA=
go.temporal.io/api v1.62.13 h1:xMa8Nt5oAMX+LvlCJA44wjTCc1H09i2rG9poB1/xvH4=
Expand Down
47 changes: 15 additions & 32 deletions plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,26 +192,29 @@ func (p *Plugin) Serve() chan error {
for {
select {
case ev := <-p.events:
p.log.Debug("worker stopped, restarting pool and temporal workers", zap.String("message", ev.Message()))

// check pid, message from the go sdk is: process exited, pid: 334455 <-- we are looking for this pid
// sdk 2.18.1
// TODO: potential bug here, if the pid contains the WW pid, it will reset everything (btw, should not be a problem)
// The pool's worker-watcher already re-allocated a replacement for the
// dead worker before emitting this event. Activity workers are stateless
// and their tasks are retried by the Temporal server, so the replacement
// is enough. Only the stateful workflow worker needs a full reset (purge
// the sticky workflow cache and replay).
//
// The watcher message is "process exited, pid: <PID>". A workflow-worker
// death always contains its PID, so it always matches; the only
// imperfection is that an activity PID which contains the WF PID as a
// substring also triggers a full reset - a harmless over-reset, never a
// missed workflow-worker death.
switch strings.Contains(ev.Message(), strconv.Itoa(p.wwPID)) {
Comment thread
rustatian marked this conversation as resolved.
// stopped workflow worker
// stopped workflow worker -> full reset
case true:
p.log.Debug("workflow worker stopped, resetting", zap.String("message", ev.Message()))
errR := p.Reset()
if errR != nil {
errCh <- errors.E(op, errors.Errorf("error during reset: %#v, event: %s", errR, ev.Message()))
return
}
// stopped one of the activity workers
// stopped one of the activity workers -> already replaced by the pool, nothing to do
case false:
errR := p.ResetAP()
if errR != nil {
errCh <- errors.E(op, errors.Errorf("error during reset: %#v, event: %s", errR, ev.Message()))
return
}
p.log.Debug("activity worker stopped, replaced by the pool", zap.String("message", ev.Message()))
}

case <-p.stopCh:
Expand Down Expand Up @@ -308,26 +311,6 @@ func (p *Plugin) Workers() []*process.State {
return states
}

func (p *Plugin) ResetAP() error {
const op = errors.Op("temporal_plugin_reset")

p.mu.Lock()
defer p.mu.Unlock()

p.log.Info("reset signal received, resetting activity pool")

ctx, cancel := context.WithTimeout(context.Background(), time.Second*30)
defer cancel()

errAp := p.actP.Reset(ctx)
if errAp != nil {
return errors.E(op, errAp)
}
p.log.Info("activity pool restarted")

return nil
}

func (p *Plugin) Reset() error {
const op = errors.Op("temporal_reset")

Expand Down
2 changes: 1 addition & 1 deletion tests/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ require (
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
github.com/prometheus/client_golang v1.23.2 // indirect
github.com/prometheus/client_model v0.6.2 // indirect
github.com/prometheus/common v0.68.0 // indirect
github.com/prometheus/common v0.68.1 // indirect
github.com/prometheus/procfs v0.20.1 // indirect
github.com/roadrunner-server/errors v1.5.0 // indirect
github.com/roadrunner-server/events v1.0.1 // indirect
Expand Down
4 changes: 2 additions & 2 deletions tests/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -183,8 +183,8 @@ github.com/prometheus/client_model v0.6.2/go.mod h1:y3m2F6Gdpfy6Ut/GBsUqTWZqCUvM
github.com/prometheus/common v0.4.1/go.mod h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y86RQel1bk4=
github.com/prometheus/common v0.10.0/go.mod h1:Tlit/dnDKsSWFlCLTWaA1cyBgKHSMdTB80sz/V91rCo=
github.com/prometheus/common v0.26.0/go.mod h1:M7rCNAaPfAosfx8veZJCuw84e35h3Cfd9VFqTh1DIvc=
github.com/prometheus/common v0.68.0 h1:8rQJvQmYltsR2L7h8Zw0Iyj8WYNNmpwikoQTZXwfVeA=
github.com/prometheus/common v0.68.0/go.mod h1:4soH+U8yJSROk7OJ//hmTiWKsxapv6zRGgTt3keN8gQ=
github.com/prometheus/common v0.68.1 h1:omjRRl4QP4komogpXuhfeOiisQg7xdy8VM1UY+pStaY=
github.com/prometheus/common v0.68.1/go.mod h1:ZzL3f6u94qUxh9p+tJTrF+FvBS1XXbbRAZCQkytAL0Y=
github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk=
github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsTZCD3I8kEA=
github.com/prometheus/procfs v0.1.3/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4OA4YeYWdaU=
Expand Down
Loading