diff --git a/.claude/CLAUDE.md b/.claude/CLAUDE.md index 3aadcbac..54bb4754 100644 --- a/.claude/CLAUDE.md +++ b/.claude/CLAUDE.md @@ -84,7 +84,7 @@ golangci-lint run --timeout=10m --build-tags=safe - Creates Temporal client with interceptors - Starts Temporal workers -2. **Reset Flow** (`plugin.go:Reset()`, `ResetAP()`): +2. **Reset Flow** (`plugin.go:Reset()`): - Triggered by worker stop events - Stops Temporal workers, resets pools - Purges sticky workflow cache @@ -94,7 +94,7 @@ golangci-lint run --timeout=10m --build-tags=safe 3. **Event Handling**: - Subscribes to `EventWorkerStopped` events - Checks PID in event message to determine WF vs Activity worker - - Executes full reset for WF worker, activity-only reset for activity workers + - Workflow-worker death triggers a full reset; an activity-worker death needs no reset (the pool's watcher already replaces the single dead worker) ### Configuration diff --git a/go.mod b/go.mod index ee7c4009..bdf5a930 100644 --- a/go.mod +++ b/go.mod @@ -37,7 +37,7 @@ require ( github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect github.com/prometheus/client_golang v1.23.2 github.com/prometheus/client_model v0.6.2 // indirect - github.com/prometheus/common v0.68.0 // indirect + github.com/prometheus/common v0.68.1 // indirect github.com/prometheus/procfs v0.20.1 // indirect github.com/roadrunner-server/goridge/v3 v3.8.3 github.com/robfig/cron v1.2.0 // indirect diff --git a/go.sum b/go.sum index e2a227d4..25ecf3ee 100644 --- a/go.sum +++ b/go.sum @@ -165,8 +165,8 @@ github.com/prometheus/client_model v0.6.2/go.mod h1:y3m2F6Gdpfy6Ut/GBsUqTWZqCUvM github.com/prometheus/common v0.4.1/go.mod h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y86RQel1bk4= github.com/prometheus/common v0.10.0/go.mod h1:Tlit/dnDKsSWFlCLTWaA1cyBgKHSMdTB80sz/V91rCo= github.com/prometheus/common v0.26.0/go.mod h1:M7rCNAaPfAosfx8veZJCuw84e35h3Cfd9VFqTh1DIvc= -github.com/prometheus/common v0.68.0 h1:8rQJvQmYltsR2L7h8Zw0Iyj8WYNNmpwikoQTZXwfVeA= -github.com/prometheus/common v0.68.0/go.mod h1:4soH+U8yJSROk7OJ//hmTiWKsxapv6zRGgTt3keN8gQ= +github.com/prometheus/common v0.68.1 h1:omjRRl4QP4komogpXuhfeOiisQg7xdy8VM1UY+pStaY= +github.com/prometheus/common v0.68.1/go.mod h1:ZzL3f6u94qUxh9p+tJTrF+FvBS1XXbbRAZCQkytAL0Y= github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk= github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsTZCD3I8kEA= github.com/prometheus/procfs v0.1.3/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4OA4YeYWdaU= @@ -230,10 +230,15 @@ github.com/yusufpapurcu/wmi v1.2.4/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQ go.opentelemetry.io/auto/sdk v1.2.1 h1:jXsnJ4Lmnqd11kwkBV2LgLoFMZKizbCi5fNZ/ipaZ64= go.opentelemetry.io/auto/sdk v1.2.1/go.mod h1:KRTj+aOaElaLi+wW1kO/DZRXwkF4C5xPbEe3ZiIhN7Y= go.opentelemetry.io/otel v1.44.0 h1:JjwHmHpA4iZ3wBxluu2fbbE7j4kqlE8jXyAyPXH7HqU= +go.opentelemetry.io/otel v1.44.0/go.mod h1:BMgjTHL9WPRlRjL2oZCBTL4whCGtXch2H4BhOPIAyYc= go.opentelemetry.io/otel/metric v1.44.0 h1:1w0gILTcHdr3YI+ixLyjemwrVnsMURbTZFrSYCdDdmc= +go.opentelemetry.io/otel/metric v1.44.0/go.mod h1:8O7hanEPBNgEMmybD3s2VBKcgWOCsA6tzHBPODAiquo= go.opentelemetry.io/otel/sdk v1.44.0 h1:nHYwb9lK+fJPU/dnT6s7W7Z8itMWyqrnVfbheVYrZ58= +go.opentelemetry.io/otel/sdk v1.44.0/go.mod h1:Osuydd3Se74nqjAKxid74N5eC+jfEqfTegHRnq58oK0= go.opentelemetry.io/otel/sdk/metric v1.44.0 h1:3LlKgI+VjbVsjNRFZJZAJ30WjXC5VkNRks6si09iEfI= +go.opentelemetry.io/otel/sdk/metric v1.44.0/go.mod h1:5B5pMARnXxKhltooO4xUuCBorl65a4EpnTalObqOigA= go.opentelemetry.io/otel/trace v1.44.0 h1:jxF5CsGYCe74MCRx2X4g7WsY/VBKRqqpNvXlX/6gtIk= +go.opentelemetry.io/otel/trace v1.44.0/go.mod h1:oLl1jrMQAVo6v3GAggN+1VH9VIz9iUSvW53sW1Q8PIE= go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI= go.temporal.io/api v1.5.0/go.mod h1:BqKxEJJYdxb5dqf0ODfzfMxh8UEQ5L3zKS51FiIYYkA= go.temporal.io/api v1.62.13 h1:xMa8Nt5oAMX+LvlCJA44wjTCc1H09i2rG9poB1/xvH4= diff --git a/plugin.go b/plugin.go index f620d9a0..340b4061 100644 --- a/plugin.go +++ b/plugin.go @@ -192,26 +192,29 @@ func (p *Plugin) Serve() chan error { for { select { case ev := <-p.events: - p.log.Debug("worker stopped, restarting pool and temporal workers", zap.String("message", ev.Message())) - - // check pid, message from the go sdk is: process exited, pid: 334455 <-- we are looking for this pid - // sdk 2.18.1 - // TODO: potential bug here, if the pid contains the WW pid, it will reset everything (btw, should not be a problem) + // The pool's worker-watcher already re-allocated a replacement for the + // dead worker before emitting this event. Activity workers are stateless + // and their tasks are retried by the Temporal server, so the replacement + // is enough. Only the stateful workflow worker needs a full reset (purge + // the sticky workflow cache and replay). + // + // The watcher message is "process exited, pid: ". A workflow-worker + // death always contains its PID, so it always matches; the only + // imperfection is that an activity PID which contains the WF PID as a + // substring also triggers a full reset - a harmless over-reset, never a + // missed workflow-worker death. switch strings.Contains(ev.Message(), strconv.Itoa(p.wwPID)) { - // stopped workflow worker + // stopped workflow worker -> full reset case true: + p.log.Debug("workflow worker stopped, resetting", zap.String("message", ev.Message())) errR := p.Reset() if errR != nil { errCh <- errors.E(op, errors.Errorf("error during reset: %#v, event: %s", errR, ev.Message())) return } - // stopped one of the activity workers + // stopped one of the activity workers -> already replaced by the pool, nothing to do case false: - errR := p.ResetAP() - if errR != nil { - errCh <- errors.E(op, errors.Errorf("error during reset: %#v, event: %s", errR, ev.Message())) - return - } + p.log.Debug("activity worker stopped, replaced by the pool", zap.String("message", ev.Message())) } case <-p.stopCh: @@ -308,26 +311,6 @@ func (p *Plugin) Workers() []*process.State { return states } -func (p *Plugin) ResetAP() error { - const op = errors.Op("temporal_plugin_reset") - - p.mu.Lock() - defer p.mu.Unlock() - - p.log.Info("reset signal received, resetting activity pool") - - ctx, cancel := context.WithTimeout(context.Background(), time.Second*30) - defer cancel() - - errAp := p.actP.Reset(ctx) - if errAp != nil { - return errors.E(op, errAp) - } - p.log.Info("activity pool restarted") - - return nil -} - func (p *Plugin) Reset() error { const op = errors.Op("temporal_reset") diff --git a/tests/go.mod b/tests/go.mod index 6b6b3eed..17d9fe7e 100644 --- a/tests/go.mod +++ b/tests/go.mod @@ -57,7 +57,7 @@ require ( github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect github.com/prometheus/client_golang v1.23.2 // indirect github.com/prometheus/client_model v0.6.2 // indirect - github.com/prometheus/common v0.68.0 // indirect + github.com/prometheus/common v0.68.1 // indirect github.com/prometheus/procfs v0.20.1 // indirect github.com/roadrunner-server/errors v1.5.0 // indirect github.com/roadrunner-server/events v1.0.1 // indirect diff --git a/tests/go.sum b/tests/go.sum index 27328f19..b8d698c1 100644 --- a/tests/go.sum +++ b/tests/go.sum @@ -183,8 +183,8 @@ github.com/prometheus/client_model v0.6.2/go.mod h1:y3m2F6Gdpfy6Ut/GBsUqTWZqCUvM github.com/prometheus/common v0.4.1/go.mod h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y86RQel1bk4= github.com/prometheus/common v0.10.0/go.mod h1:Tlit/dnDKsSWFlCLTWaA1cyBgKHSMdTB80sz/V91rCo= github.com/prometheus/common v0.26.0/go.mod h1:M7rCNAaPfAosfx8veZJCuw84e35h3Cfd9VFqTh1DIvc= -github.com/prometheus/common v0.68.0 h1:8rQJvQmYltsR2L7h8Zw0Iyj8WYNNmpwikoQTZXwfVeA= -github.com/prometheus/common v0.68.0/go.mod h1:4soH+U8yJSROk7OJ//hmTiWKsxapv6zRGgTt3keN8gQ= +github.com/prometheus/common v0.68.1 h1:omjRRl4QP4komogpXuhfeOiisQg7xdy8VM1UY+pStaY= +github.com/prometheus/common v0.68.1/go.mod h1:ZzL3f6u94qUxh9p+tJTrF+FvBS1XXbbRAZCQkytAL0Y= github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk= github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsTZCD3I8kEA= github.com/prometheus/procfs v0.1.3/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4OA4YeYWdaU=