fix: ensure data published to MQTT is not dropped#305
fix: ensure data published to MQTT is not dropped#305paulstuart wants to merge 26 commits intodevelopfrom
Conversation
…il rotation is required
Vulnerability Scan: PassedImage:
Commit: d913d7f |
|
Go test coverage
Total coverage: 59.9% |
There was a problem hiding this comment.
Pull request overview
This PR aims to prevent telemetry/OTLP data from being dropped during MQTT disconnects caused by frequent credential rotation, by queueing publishes and refreshing JWT credentials automatically on MQTT auto-reconnect. It also adds a debug-only signal trigger for forcing rotation/inspecting token status and wires build tags through local builds and Docker builds.
Changes:
- Switch OTLP bridge publishing to
PublishViaQueueto buffer messages across MQTT disconnects. - Add an autopaho
ConnectPacketBuilderthat refreshes the JWT on auto-reconnect, and wire it fromFleetConfigManager. - Add a debug-only SIGUSR1/SIGUSR2 trigger for credential rotation/status logging, plus build-tag plumbing (
BUILD_TAGS) for make/Docker and a small golangci-lint config tweak.
Reviewed changes
Copilot reviewed 12 out of 12 changed files in this pull request and generated 4 comments.
Show a summary per file
| File | Description |
|---|---|
| Makefile | Adds BUILD_TAGS plumbing into go build/go test and passes build args into Docker builds. |
| agent/otlpbridge/publisher_adapter.go | Uses PublishViaQueue to buffer OTLP publishes during disconnects. |
| agent/otlpbridge/mqtt.go | Uses PublishViaQueue for the OTLP bridge MQTT publisher implementation. |
| agent/docker/Dockerfile | Accepts/passes BUILD_TAGS into the make agent_bin build step. |
| agent/configmgr/fleet/token_refresh_test.go | Adds unit tests for the reconnect-time JWT refresh ConnectPacketBuilder behavior. |
| agent/configmgr/fleet/debug.go | Introduces a small interface used by debug-only trigger code (avoids package dependency). |
| agent/configmgr/fleet/debug_trigger.go | Adds debug-tag-only OS signal trigger to rotate/log credentials. |
| agent/configmgr/fleet/debug_trigger_test.go | Adds debug-tag-only tests for signal-trigger behavior. |
| agent/configmgr/fleet/debug_trigger_off.go | Provides a no-op StartDebugTrigger when not built with -tags debug. |
| agent/configmgr/fleet/connection.go | Adds token refresher plumbing, ConnectPacketBuilder for auto-reconnect JWT refresh, and dispatch shutdown race handling changes. |
| agent/configmgr/fleet.go | Wires token refresher into the MQTT connection; starts debug triggers; implements debug credential methods. |
| .github/golangci.yaml | Enables relative-path-mode: gomod for lint output paths. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Somewhat moot as the debug code will go away once code is guaranteed solid Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
…d require refresh. An 'expected' error that is recoverable and part of the process should not be treated as a real error. The goal is that the agent should never log an issue as an error unless it is a real problem and we don't get blind to 'normal errors'
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 21 out of 21 changed files in this pull request and generated 4 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 25 out of 25 changed files in this pull request and generated 4 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 25 out of 25 changed files in this pull request and generated 4 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 25 out of 25 changed files in this pull request and generated 3 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| s.pendingMu.Lock() | ||
| if s.ready { | ||
| s.pendingMu.Unlock() | ||
| return | ||
| } |
There was a problem hiding this comment.
drainPending can run concurrently (e.g., if multiple goroutines call SetPublisher/Set*Topic when all fields are already set). Because s.ready is only set to true at the end of the drain, a second drainPending invocation can observe ready == false, do no work, and set ready = true while the first drain is still publishing. That allows new Enqueue calls to publish directly and overtake queued messages (breaking FIFO ordering). Consider adding a separate draining/drainInProgress flag under pendingMu (or a sync.Once + a draining state) so only one drain can proceed and Enqueue keeps queueing until the active drain completes.
| // Enqueue marshaled OTLP data for publishing. Before the MQTT connection is | ||
| // ready the payload is queued in memory (up to maxPending messages; oldest | ||
| // messages are dropped when the queue is full). Once ready, publishes directly. | ||
| func (s *BridgeServer) Enqueue(ctx context.Context, isIngest bool, payload []byte) error { | ||
| s.pendingMu.Lock() | ||
| if !s.ready { | ||
| if s.maxPending > 0 && len(s.pending) >= s.maxPending { | ||
| s.pendingDropped++ |
There was a problem hiding this comment.
The doc comment for Enqueue says “oldest messages are dropped when the queue is full”, but the implementation actually rejects the new message and preserves the existing queue (see the len(s.pending) >= s.maxPending branch returning ResourceExhausted). Please either update the comment to match the behavior (reject-new), or change the implementation to evict the oldest entry when at capacity.
|
|
||
| func TestBridge_Enqueue_QueuesDrainsOnReady(t *testing.T) { | ||
| fp := &fakePublisher{} | ||
| bridge := &BridgeServer{enc: ProtobufEncoder{}} |
There was a problem hiding this comment.
This test constructs BridgeServer via a struct literal (&BridgeServer{...}), which bypasses NewBridgeServer initialization (notably maxPending defaulting). That can make tests diverge from real runtime behavior (e.g., maxPending stays 0 → effectively unbounded). Prefer constructing the bridge via NewBridgeServer(...) (or explicitly set maxPending in the literal) so queue semantics match production.
| bridge := &BridgeServer{enc: ProtobufEncoder{}} | |
| bridge := NewBridgeServer(ProtobufEncoder{}) |
…oken' vs Warn for 'there were issues trying to do something but it's all good now'
As the MQTT credentials rotate frequently, it encounters situations where the connection is invalid and the data is unable to be sent.
This leverages the queuing capability in the handler and ensures that normal credentials rotation does not drop data, nor pollute the logs with errors that are "part of doing business"