From 5f9a9420f1cc9cf2874ababb12527688c4d94173 Mon Sep 17 00:00:00 2001 From: Kacper Sawicki Date: Mon, 2 Mar 2026 12:40:01 +0000 Subject: [PATCH 1/9] feat: add Prometheus metrics for observability Add Prometheus metrics to expose request success/failure counts and error types so operators can monitor and alert on API communication issues. New metrics: - coder_logstream_requests_total{status} - tracks success/failure counts - coder_logstream_errors_total{type} - tracks errors by type (network) New flag: - --metrics-addr / CODER_LOGSTREAM_METRICS_ADDR (default :9100) Starts an HTTP server serving /metrics for Prometheus scraping. Instrumented paths: - PostLogSource failures - ConnectRPC28WithRole / ConnectRPC20 failures - Log Send failures - All success paths Resolves #173, partially addresses #146 --- go.mod | 4 ++-- logger.go | 10 ++++++++++ main.go | 25 ++++++++++++++++++++++++- metrics.go | 28 ++++++++++++++++++++++++++++ metrics_test.go | 45 +++++++++++++++++++++++++++++++++++++++++++++ 5 files changed, 109 insertions(+), 3 deletions(-) create mode 100644 metrics.go create mode 100644 metrics_test.go diff --git a/go.mod b/go.mod index 14a2e20..9224526 100644 --- a/go.mod +++ b/go.mod @@ -16,6 +16,8 @@ require ( github.com/go-chi/chi/v5 v5.2.2 github.com/google/uuid v1.6.0 github.com/hashicorp/yamux v0.1.2 + github.com/prometheus/client_golang v1.23.2 + github.com/prometheus/client_model v0.6.2 github.com/stretchr/testify v1.11.1 golang.org/x/mod v0.33.0 google.golang.org/protobuf v1.36.11 @@ -177,8 +179,6 @@ require ( github.com/planetscale/vtprotobuf v0.6.1-0.20240319094008-0393e58bdf10 // indirect github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect github.com/power-devops/perfstat v0.0.0-20240221224432-82ca36839d55 // indirect - github.com/prometheus/client_golang v1.23.2 // indirect - github.com/prometheus/client_model v0.6.2 // indirect github.com/prometheus/common v0.67.4 // indirect github.com/prometheus/procfs v0.16.1 // indirect github.com/puzpuzpuz/xsync/v3 v3.5.1 // indirect diff --git a/logger.go b/logger.go index 0d0a7a4..94e1de3 100644 --- a/logger.go +++ b/logger.go @@ -571,6 +571,8 @@ func (l *logQueuer) newLogger(ctx context.Context, log agentLog) (agentLoggerLif // Posting the log source failed, which affects how logs appear. // We'll retry to ensure the log source is properly registered. logger.Error(ctx, "post log source", slog.Error(err)) + requestsTotal.WithLabelValues("failure").Inc() + errorsTotal.WithLabelValues("network").Inc() return agentLoggerLifecycle{}, err } @@ -598,6 +600,8 @@ func (l *logQueuer) newLogger(ctx context.Context, log agentLog) (agentLoggerLif arpc, _, err := client.ConnectRPC28WithRole(gracefulCtx, "logstream-kube") if err != nil { logger.Error(ctx, "drpc connect with role", slog.Error(err)) + requestsTotal.WithLabelValues("failure").Inc() + errorsTotal.WithLabelValues("network").Inc() gracefulCancel() return agentLoggerLifecycle{}, err } @@ -607,12 +611,15 @@ func (l *logQueuer) newLogger(ctx context.Context, log agentLog) (agentLoggerLif arpc, err := client.ConnectRPC20(gracefulCtx) if err != nil { logger.Error(ctx, "drpc connect", slog.Error(err)) + requestsTotal.WithLabelValues("failure").Inc() + errorsTotal.WithLabelValues("network").Inc() gracefulCancel() return agentLoggerLifecycle{}, err } logDest = arpc rpcConn = arpc.DRPCConn() } + requestsTotal.WithLabelValues("success").Inc() go func() { err := ls.SendLoop(gracefulCtx, logDest) // if the send loop exits on its own without the context @@ -691,9 +698,12 @@ func (l *logQueuer) processLog(ctx context.Context, log agentLog) { return } if err := lgr.scriptLogger.Send(ctx, queuedLogs...); err != nil { + requestsTotal.WithLabelValues("failure").Inc() + errorsTotal.WithLabelValues("network").Inc() l.scheduleRetry(ctx, log.agentToken) return } + requestsTotal.WithLabelValues("success").Inc() l.clearRetryLocked(log.agentToken) l.logCache.delete(log.agentToken) } diff --git a/main.go b/main.go index 974ad0f..49b5098 100644 --- a/main.go +++ b/main.go @@ -3,6 +3,7 @@ package main import ( "errors" "fmt" + "net/http" "net/url" "os" "strings" @@ -30,6 +31,7 @@ func root() *serpent.Command { kubeConfig string namespacesStr string labelSelector string + metricsAddr string ) cmd := &serpent.Command{ Use: "coder-logstream-kube", @@ -73,6 +75,14 @@ func root() *serpent.Command { Value: serpent.StringOf(&labelSelector), Description: "Label selector to use when listing pods.", }, + { + Name: "metrics-addr", + Flag: "metrics-addr", + Env: "CODER_LOGSTREAM_METRICS_ADDR", + Default: ":9100", + Value: serpent.StringOf(&metricsAddr), + Description: "Address to serve Prometheus metrics on. Set to empty to disable.", + }, }, Handler: func(inv *serpent.Invocation) error { if coderURL == "" { @@ -115,13 +125,15 @@ func root() *serpent.Command { } } + logger := slog.Make(sloghuman.Sink(inv.Stderr)).Leveled(slog.LevelDebug) + reporter, err := newPodEventLogger(inv.Context(), podEventLoggerOptions{ coderURL: parsedURL, client: client, namespaces: namespaces, fieldSelector: fieldSelector, labelSelector: labelSelector, - logger: slog.Make(sloghuman.Sink(inv.Stderr)).Leveled(slog.LevelDebug), + logger: logger, maxRetries: 15, // 15 retries is the default max retries for a log send failure. }) if err != nil { @@ -130,6 +142,17 @@ func root() *serpent.Command { defer func() { _ = reporter.Close() }() + + if metricsAddr != "" { + mux := http.NewServeMux() + mux.Handle("/metrics", metricsHandler()) + go func() { + if err := http.ListenAndServe(metricsAddr, mux); err != nil { + logger.Error(inv.Context(), "metrics server failed", slog.Error(err)) + } + }() + } + select { case err := <-reporter.errChan: return fmt.Errorf("pod event reporter: %w", err) diff --git a/metrics.go b/metrics.go new file mode 100644 index 0000000..b07e6c9 --- /dev/null +++ b/metrics.go @@ -0,0 +1,28 @@ +package main + +import ( + "net/http" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promhttp" +) + +var ( + requestsTotal = prometheus.NewCounterVec(prometheus.CounterOpts{ + Name: "coder_logstream_requests_total", + Help: "Total number of requests to the Coder API.", + }, []string{"status"}) // "success" | "failure" + + errorsTotal = prometheus.NewCounterVec(prometheus.CounterOpts{ + Name: "coder_logstream_errors_total", + Help: "Total number of errors by type.", + }, []string{"type"}) // "auth" | "network" | "parse" +) + +func init() { + prometheus.MustRegister(requestsTotal, errorsTotal) +} + +func metricsHandler() http.Handler { + return promhttp.Handler() +} diff --git a/metrics_test.go b/metrics_test.go new file mode 100644 index 0000000..d22c3ae --- /dev/null +++ b/metrics_test.go @@ -0,0 +1,45 @@ +package main + +import ( + "testing" + + "github.com/prometheus/client_golang/prometheus" + dto "github.com/prometheus/client_model/go" + "github.com/stretchr/testify/require" +) + +func getCounterValue(t *testing.T, cv *prometheus.CounterVec, label string) float64 { + t.Helper() + m := &dto.Metric{} + c, err := cv.GetMetricWithLabelValues(label) + require.NoError(t, err) + require.NoError(t, c.Write(m)) + return m.GetCounter().GetValue() +} + +func TestMetricsIncrement(t *testing.T) { + t.Parallel() + + // Record baseline values (metrics are global and may have been + // incremented by other tests running in the same process). + baseSuccess := getCounterValue(t, requestsTotal, "success") + baseFailure := getCounterValue(t, requestsTotal, "failure") + baseNetwork := getCounterValue(t, errorsTotal, "network") + + // Simulate success + requestsTotal.WithLabelValues("success").Inc() + require.Equal(t, baseSuccess+1, getCounterValue(t, requestsTotal, "success")) + + // Simulate failure + requestsTotal.WithLabelValues("failure").Inc() + errorsTotal.WithLabelValues("network").Inc() + require.Equal(t, baseFailure+1, getCounterValue(t, requestsTotal, "failure")) + require.Equal(t, baseNetwork+1, getCounterValue(t, errorsTotal, "network")) +} + +func TestMetricsHandler(t *testing.T) { + t.Parallel() + + handler := metricsHandler() + require.NotNil(t, handler) +} From 15c77351f405b8a8a7e7943964bc9227afa576fb Mon Sep 17 00:00:00 2001 From: Kacper Sawicki Date: Mon, 2 Mar 2026 13:43:55 +0000 Subject: [PATCH 2/9] feat: update Helm chart for metrics, default to disabled for backward compat --- helm/templates/service.yaml | 19 +++++++++++++++++++ helm/values.yaml | 7 +++++++ main.go | 2 +- 3 files changed, 27 insertions(+), 1 deletion(-) diff --git a/helm/templates/service.yaml b/helm/templates/service.yaml index 23473f5..eb10b23 100644 --- a/helm/templates/service.yaml +++ b/helm/templates/service.yaml @@ -82,6 +82,12 @@ spec: {{- with .Values.labels }} {{- toYaml . | nindent 8 }} {{- end }} + {{- if .Values.metrics.enabled }} + annotations: + prometheus.io/scrape: "true" + prometheus.io/port: {{ .Values.metrics.port | quote }} + prometheus.io/path: "/metrics" + {{- end }} spec: serviceAccountName: {{ .Values.serviceAccount.name | quote }} restartPolicy: Always @@ -111,10 +117,23 @@ spec: imagePullPolicy: {{ .Values.image.pullPolicy }} command: - /coder-logstream-kube + {{- if .Values.metrics.enabled }} + ports: + - name: metrics + containerPort: {{ .Values.metrics.port }} + protocol: TCP + {{- end }} resources: {{ toYaml .Values.resources | nindent 12 }} env: - name: CODER_URL value: {{ .Values.url }} + {{- if .Values.metrics.enabled }} + - name: CODER_LOGSTREAM_METRICS_ADDR + value: ":{{ .Values.metrics.port }}" + {{- else }} + - name: CODER_LOGSTREAM_METRICS_ADDR + value: "" + {{- end }} {{- if .Values.namespaces }} - name: CODER_NAMESPACES value: {{ join "," .Values.namespaces }} diff --git a/helm/values.yaml b/helm/values.yaml index daa847f..e667225 100644 --- a/helm/values.yaml +++ b/helm/values.yaml @@ -86,6 +86,13 @@ tolerations: # value: "value" # effect: "NoSchedule" +# metrics -- Prometheus metrics configuration. +metrics: + # metrics.enabled -- Whether to expose a Prometheus /metrics endpoint. + enabled: false + # metrics.port -- The port to serve Prometheus metrics on. + port: 9100 + # labels -- The pod labels for coder-logstream-kube. See: # https://kubernetes.io/docs/concepts/overview/working-with-objects/labels/ labels: {} diff --git a/main.go b/main.go index 49b5098..5741ca5 100644 --- a/main.go +++ b/main.go @@ -79,7 +79,7 @@ func root() *serpent.Command { Name: "metrics-addr", Flag: "metrics-addr", Env: "CODER_LOGSTREAM_METRICS_ADDR", - Default: ":9100", + Default: "", Value: serpent.StringOf(&metricsAddr), Description: "Address to serve Prometheus metrics on. Set to empty to disable.", }, From d866562d0d2e25c7c15baaf6efbfae4693e15297 Mon Sep 17 00:00:00 2001 From: Kacper Sawicki Date: Mon, 2 Mar 2026 13:45:19 +0000 Subject: [PATCH 3/9] test: add end-to-end metrics endpoint test --- metrics_test.go | 46 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 46 insertions(+) diff --git a/metrics_test.go b/metrics_test.go index d22c3ae..3e138a7 100644 --- a/metrics_test.go +++ b/metrics_test.go @@ -1,7 +1,12 @@ package main import ( + "io" + "net" + "net/http" + "strings" "testing" + "time" "github.com/prometheus/client_golang/prometheus" dto "github.com/prometheus/client_model/go" @@ -43,3 +48,44 @@ func TestMetricsHandler(t *testing.T) { handler := metricsHandler() require.NotNil(t, handler) } + +func TestMetricsEndpoint(t *testing.T) { + t.Parallel() + + // Pick a random free port. + listener, err := net.Listen("tcp", "127.0.0.1:0") + require.NoError(t, err) + addr := listener.Addr().String() + _ = listener.Close() + + mux := http.NewServeMux() + mux.Handle("/metrics", metricsHandler()) + srv := &http.Server{Addr: addr, Handler: mux} + go func() { _ = srv.ListenAndServe() }() + t.Cleanup(func() { _ = srv.Close() }) + + // Wait for the server to be ready. + require.Eventually(t, func() bool { + resp, err := http.Get("http://" + addr + "/metrics") + if err != nil { + return false + } + _ = resp.Body.Close() + return resp.StatusCode == http.StatusOK + }, 2*time.Second, 50*time.Millisecond) + + // Bump a counter and verify it appears in the output. + requestsTotal.WithLabelValues("success").Inc() + + resp, err := http.Get("http://" + addr + "/metrics") + require.NoError(t, err) + defer resp.Body.Close() + + body, err := io.ReadAll(resp.Body) + require.NoError(t, err) + + require.True(t, strings.Contains(string(body), "coder_logstream_requests_total"), + "expected coder_logstream_requests_total in metrics output") + require.True(t, strings.Contains(string(body), "coder_logstream_errors_total"), + "expected coder_logstream_errors_total in metrics output") +} From c781727bb77db4f7542e0885a8a0777178c6f132 Mon Sep 17 00:00:00 2001 From: Kacper Sawicki Date: Mon, 2 Mar 2026 13:48:05 +0000 Subject: [PATCH 4/9] fix: initialize metric labels so they appear at zero in /metrics --- metrics.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/metrics.go b/metrics.go index b07e6c9..5a2d5eb 100644 --- a/metrics.go +++ b/metrics.go @@ -21,6 +21,12 @@ var ( func init() { prometheus.MustRegister(requestsTotal, errorsTotal) + + // Initialize all label combinations so they appear in /metrics output + // even before the first increment. + requestsTotal.WithLabelValues("success") + requestsTotal.WithLabelValues("failure") + errorsTotal.WithLabelValues("network") } func metricsHandler() http.Handler { From 2b15e2ee737d1a7be21dd703ad1d82d982e3ac71 Mon Sep 17 00:00:00 2001 From: Kacper Sawicki Date: Mon, 2 Mar 2026 14:02:08 +0000 Subject: [PATCH 5/9] fix: handle errcheck lint for resp.Body.Close in test --- metrics_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/metrics_test.go b/metrics_test.go index 3e138a7..a40cdde 100644 --- a/metrics_test.go +++ b/metrics_test.go @@ -79,7 +79,7 @@ func TestMetricsEndpoint(t *testing.T) { resp, err := http.Get("http://" + addr + "/metrics") require.NoError(t, err) - defer resp.Body.Close() + defer func() { _ = resp.Body.Close() }() body, err := io.ReadAll(resp.Body) require.NoError(t, err) From b596302fa85ab4835f8e4dda36f33f5fb3d667b3 Mon Sep 17 00:00:00 2001 From: Kacper Sawicki Date: Tue, 3 Mar 2026 10:02:38 +0000 Subject: [PATCH 6/9] refactor: move metrics into instrumentedClient wrapper, remove errorsTotal Address review feedback from @deansheather: - Replace scattered metric calls with an instrumentedClient wrapper around agentsdk.Client that records metrics in one place - Remove redundant errorsTotal metric (was always incremented alongside requestsTotal failure) - Add method label to requestsTotal for better granularity: PostLogSource, ConnectRPC, SendLog --- logger.go | 44 ++++++---------------------- metrics.go | 76 +++++++++++++++++++++++++++++++++++++++++-------- metrics_test.go | 36 +++++++++++------------ 3 files changed, 91 insertions(+), 65 deletions(-) diff --git a/logger.go b/logger.go index 94e1de3..6b432b4 100644 --- a/logger.go +++ b/logger.go @@ -21,7 +21,6 @@ import ( "cdr.dev/slog/v3" "golang.org/x/mod/semver" - "storj.io/drpc" "github.com/coder/coder/v2/codersdk" "github.com/coder/coder/v2/codersdk/agentsdk" @@ -557,7 +556,7 @@ func (l *logQueuer) cleanup() { } func (l *logQueuer) newLogger(ctx context.Context, log agentLog) (agentLoggerLifecycle, error) { - client := agentsdk.New(l.coderURL, agentsdk.WithFixedToken(log.agentToken)) + client := newInstrumentedClient(l.coderURL, log.agentToken) logger := l.logger.With(slog.F("resource_name", log.resourceName)) client.SDK.SetLogger(logger) @@ -571,8 +570,6 @@ func (l *logQueuer) newLogger(ctx context.Context, log agentLog) (agentLoggerLif // Posting the log source failed, which affects how logs appear. // We'll retry to ensure the log source is properly registered. logger.Error(ctx, "post log source", slog.Error(err)) - requestsTotal.WithLabelValues("failure").Inc() - errorsTotal.WithLabelValues("network").Inc() return agentLoggerLifecycle{}, err } @@ -592,34 +589,12 @@ func (l *logQueuer) newLogger(ctx context.Context, log agentLog) (agentLoggerLif } supportsRole := buildInfoErr == nil && versionAtLeast(buildInfo.Version, "v2.31.0") - var ( - logDest agentsdk.LogDest - rpcConn drpc.Conn - ) - if supportsRole { - arpc, _, err := client.ConnectRPC28WithRole(gracefulCtx, "logstream-kube") - if err != nil { - logger.Error(ctx, "drpc connect with role", slog.Error(err)) - requestsTotal.WithLabelValues("failure").Inc() - errorsTotal.WithLabelValues("network").Inc() - gracefulCancel() - return agentLoggerLifecycle{}, err - } - logDest = arpc - rpcConn = arpc.DRPCConn() - } else { - arpc, err := client.ConnectRPC20(gracefulCtx) - if err != nil { - logger.Error(ctx, "drpc connect", slog.Error(err)) - requestsTotal.WithLabelValues("failure").Inc() - errorsTotal.WithLabelValues("network").Inc() - gracefulCancel() - return agentLoggerLifecycle{}, err - } - logDest = arpc - rpcConn = arpc.DRPCConn() + logDest, rpcConn, err := client.connectLogDest(gracefulCtx, supportsRole) + if err != nil { + logger.Error(ctx, "drpc connect", slog.Error(err)) + gracefulCancel() + return agentLoggerLifecycle{}, err } - requestsTotal.WithLabelValues("success").Inc() go func() { err := ls.SendLoop(gracefulCtx, logDest) // if the send loop exits on its own without the context @@ -697,13 +672,12 @@ func (l *logQueuer) processLog(ctx context.Context, log agentLog) { if len(queuedLogs) == 0 { return } - if err := lgr.scriptLogger.Send(ctx, queuedLogs...); err != nil { - requestsTotal.WithLabelValues("failure").Inc() - errorsTotal.WithLabelValues("network").Inc() + sendErr := lgr.scriptLogger.Send(ctx, queuedLogs...) + recordSendResult(sendErr) + if sendErr != nil { l.scheduleRetry(ctx, log.agentToken) return } - requestsTotal.WithLabelValues("success").Inc() l.clearRetryLocked(log.agentToken) l.logCache.delete(log.agentToken) } diff --git a/metrics.go b/metrics.go index 5a2d5eb..529fd5d 100644 --- a/metrics.go +++ b/metrics.go @@ -1,34 +1,86 @@ package main import ( + "context" "net/http" + "net/url" + "github.com/coder/coder/v2/codersdk" + "github.com/coder/coder/v2/codersdk/agentsdk" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promhttp" + "storj.io/drpc" ) var ( requestsTotal = prometheus.NewCounterVec(prometheus.CounterOpts{ Name: "coder_logstream_requests_total", Help: "Total number of requests to the Coder API.", - }, []string{"status"}) // "success" | "failure" - - errorsTotal = prometheus.NewCounterVec(prometheus.CounterOpts{ - Name: "coder_logstream_errors_total", - Help: "Total number of errors by type.", - }, []string{"type"}) // "auth" | "network" | "parse" + }, []string{"method", "status"}) ) func init() { - prometheus.MustRegister(requestsTotal, errorsTotal) + prometheus.MustRegister(requestsTotal) - // Initialize all label combinations so they appear in /metrics output - // even before the first increment. - requestsTotal.WithLabelValues("success") - requestsTotal.WithLabelValues("failure") - errorsTotal.WithLabelValues("network") + // Initialize label combinations so they appear in /metrics at zero. + for _, method := range []string{"PostLogSource", "ConnectRPC", "SendLog"} { + requestsTotal.WithLabelValues(method, "success") + requestsTotal.WithLabelValues(method, "failure") + } } func metricsHandler() http.Handler { return promhttp.Handler() } + +// record is a helper that increments the appropriate request counter. +func record(method string, err error) { + if err != nil { + requestsTotal.WithLabelValues(method, "failure").Inc() + } else { + requestsTotal.WithLabelValues(method, "success").Inc() + } +} + +// instrumentedClient wraps agentsdk.Client to record Prometheus metrics +// on every API call. This keeps metric instrumentation in one place +// rather than scattered across call sites. +type instrumentedClient struct { + *agentsdk.Client +} + +func newInstrumentedClient(coderURL *url.URL, token string) *instrumentedClient { + return &instrumentedClient{ + Client: agentsdk.New(coderURL, agentsdk.WithFixedToken(token)), + } +} + +func (c *instrumentedClient) PostLogSource(ctx context.Context, req agentsdk.PostLogSourceRequest) (codersdk.WorkspaceAgentLogSource, error) { + resp, err := c.Client.PostLogSource(ctx, req) + record("PostLogSource", err) + return resp, err +} + +// connectLogDest establishes the appropriate RPC connection based on +// server capabilities, recording metrics for the attempt. +func (c *instrumentedClient) connectLogDest(ctx context.Context, supportsRole bool) (agentsdk.LogDest, drpc.Conn, error) { + if supportsRole { + arpc, _, err := c.Client.ConnectRPC28WithRole(ctx, "logstream-kube") + record("ConnectRPC", err) + if err != nil { + return nil, nil, err + } + return arpc, arpc.DRPCConn(), nil + } + arpc, err := c.Client.ConnectRPC20(ctx) + record("ConnectRPC", err) + if err != nil { + return nil, nil, err + } + return arpc, arpc.DRPCConn(), nil +} + +// recordSendResult records the result of a log send operation. +func recordSendResult(err error) { + record("SendLog", err) +} diff --git a/metrics_test.go b/metrics_test.go index a40cdde..5049ea8 100644 --- a/metrics_test.go +++ b/metrics_test.go @@ -13,10 +13,10 @@ import ( "github.com/stretchr/testify/require" ) -func getCounterValue(t *testing.T, cv *prometheus.CounterVec, label string) float64 { +func getCounterValue(t *testing.T, cv *prometheus.CounterVec, labels ...string) float64 { t.Helper() m := &dto.Metric{} - c, err := cv.GetMetricWithLabelValues(label) + c, err := cv.GetMetricWithLabelValues(labels...) require.NoError(t, err) require.NoError(t, c.Write(m)) return m.GetCounter().GetValue() @@ -27,19 +27,21 @@ func TestMetricsIncrement(t *testing.T) { // Record baseline values (metrics are global and may have been // incremented by other tests running in the same process). - baseSuccess := getCounterValue(t, requestsTotal, "success") - baseFailure := getCounterValue(t, requestsTotal, "failure") - baseNetwork := getCounterValue(t, errorsTotal, "network") - - // Simulate success - requestsTotal.WithLabelValues("success").Inc() - require.Equal(t, baseSuccess+1, getCounterValue(t, requestsTotal, "success")) - - // Simulate failure - requestsTotal.WithLabelValues("failure").Inc() - errorsTotal.WithLabelValues("network").Inc() - require.Equal(t, baseFailure+1, getCounterValue(t, requestsTotal, "failure")) - require.Equal(t, baseNetwork+1, getCounterValue(t, errorsTotal, "network")) + baseSuccess := getCounterValue(t, requestsTotal, "PostLogSource", "success") + baseFailure := getCounterValue(t, requestsTotal, "PostLogSource", "failure") + baseSendSuccess := getCounterValue(t, requestsTotal, "SendLog", "success") + + // Simulate success via record helper + record("PostLogSource", nil) + require.Equal(t, baseSuccess+1, getCounterValue(t, requestsTotal, "PostLogSource", "success")) + + // Simulate failure via record helper + record("PostLogSource", io.ErrUnexpectedEOF) + require.Equal(t, baseFailure+1, getCounterValue(t, requestsTotal, "PostLogSource", "failure")) + + // Simulate send success + recordSendResult(nil) + require.Equal(t, baseSendSuccess+1, getCounterValue(t, requestsTotal, "SendLog", "success")) } func TestMetricsHandler(t *testing.T) { @@ -75,7 +77,7 @@ func TestMetricsEndpoint(t *testing.T) { }, 2*time.Second, 50*time.Millisecond) // Bump a counter and verify it appears in the output. - requestsTotal.WithLabelValues("success").Inc() + record("PostLogSource", nil) resp, err := http.Get("http://" + addr + "/metrics") require.NoError(t, err) @@ -86,6 +88,4 @@ func TestMetricsEndpoint(t *testing.T) { require.True(t, strings.Contains(string(body), "coder_logstream_requests_total"), "expected coder_logstream_requests_total in metrics output") - require.True(t, strings.Contains(string(body), "coder_logstream_errors_total"), - "expected coder_logstream_errors_total in metrics output") } From 927b65f110b28d916cbdad4f9e99159f03417e8a Mon Sep 17 00:00:00 2001 From: Kacper Sawicki Date: Tue, 3 Mar 2026 10:07:36 +0000 Subject: [PATCH 7/9] fix: remove redundant embedded field from selector (staticcheck QF1008) --- metrics.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/metrics.go b/metrics.go index 529fd5d..64dab50 100644 --- a/metrics.go +++ b/metrics.go @@ -65,14 +65,14 @@ func (c *instrumentedClient) PostLogSource(ctx context.Context, req agentsdk.Pos // server capabilities, recording metrics for the attempt. func (c *instrumentedClient) connectLogDest(ctx context.Context, supportsRole bool) (agentsdk.LogDest, drpc.Conn, error) { if supportsRole { - arpc, _, err := c.Client.ConnectRPC28WithRole(ctx, "logstream-kube") + arpc, _, err := c.ConnectRPC28WithRole(ctx, "logstream-kube") record("ConnectRPC", err) if err != nil { return nil, nil, err } return arpc, arpc.DRPCConn(), nil } - arpc, err := c.Client.ConnectRPC20(ctx) + arpc, err := c.ConnectRPC20(ctx) record("ConnectRPC", err) if err != nil { return nil, nil, err From cd7784aca785cdc21f85dfe3344709e131698b0d Mon Sep 17 00:00:00 2001 From: Kacper Sawicki Date: Tue, 3 Mar 2026 14:56:55 +0000 Subject: [PATCH 8/9] refactor: use custom prometheus registry and method enum Address review feedback: - Replace global prometheus metrics with a metricsCollector struct holding a custom prometheus.Registry, passed to all components - Use requestMethod enum (methodPostLogSource, methodConnectRPC, methodSendLog) instead of raw strings - Tests create fresh metricsCollector instances, avoiding flakes from shared global state in parallel test execution --- logger.go | 12 +++++++-- logger_test.go | 6 +++++ main.go | 4 ++- metrics.go | 71 ++++++++++++++++++++++++++++++++----------------- metrics_test.go | 36 ++++++++++++++----------- 5 files changed, 85 insertions(+), 44 deletions(-) diff --git a/logger.go b/logger.go index 6b432b4..8cbd050 100644 --- a/logger.go +++ b/logger.go @@ -42,6 +42,8 @@ type podEventLoggerOptions struct { // maxRetries is the maximum number of retries for a log send failure. maxRetries int + metrics *metricsCollector + // The following fields are optional! namespaces []string fieldSelector string @@ -62,6 +64,9 @@ func newPodEventLogger(ctx context.Context, opts podEventLoggerOptions) (*podEve if opts.maxRetries == 0 { opts.maxRetries = 10 } + if opts.metrics == nil { + opts.metrics = newMetricsCollector() + } logCh := make(chan agentLog, 512) ctx, cancelFunc := context.WithCancel(ctx) @@ -87,6 +92,7 @@ func newPodEventLogger(ctx context.Context, opts podEventLoggerOptions) (*podEve logs: map[string][]agentsdk.Log{}, }, maxRetries: opts.maxRetries, + metrics: opts.metrics, }, doneChan: make(chan struct{}), } @@ -519,6 +525,8 @@ type logQueuer struct { retries map[string]*retryState // maxRetries is the maximum number of retries for a log send failure. maxRetries int + + metrics *metricsCollector } func (l *logQueuer) work(ctx context.Context, done chan struct{}) { @@ -556,7 +564,7 @@ func (l *logQueuer) cleanup() { } func (l *logQueuer) newLogger(ctx context.Context, log agentLog) (agentLoggerLifecycle, error) { - client := newInstrumentedClient(l.coderURL, log.agentToken) + client := newInstrumentedClient(l.coderURL, log.agentToken, l.metrics) logger := l.logger.With(slog.F("resource_name", log.resourceName)) client.SDK.SetLogger(logger) @@ -673,7 +681,7 @@ func (l *logQueuer) processLog(ctx context.Context, log agentLog) { return } sendErr := lgr.scriptLogger.Send(ctx, queuedLogs...) - recordSendResult(sendErr) + l.metrics.record(methodSendLog, sendErr) if sendErr != nil { l.scheduleRetry(ctx, log.agentToken) return diff --git a/logger_test.go b/logger_test.go index 8a76de5..ee6c469 100644 --- a/logger_test.go +++ b/logger_test.go @@ -662,6 +662,7 @@ func Test_logQueuer(t *testing.T) { ch := make(chan agentLog) lq := &logQueuer{ + metrics: newMetricsCollector(), logger: slogtest.Make(t, nil), clock: clock, q: ch, @@ -728,6 +729,7 @@ func Test_logQueuer(t *testing.T) { IgnoreErrors: true, }) lq := &logQueuer{ + metrics: newMetricsCollector(), logger: logger, clock: clock, q: ch, @@ -815,6 +817,7 @@ func Test_logQueuer(t *testing.T) { clock := quartz.NewMock(t) ch := make(chan agentLog, 10) lq := &logQueuer{ + metrics: newMetricsCollector(), logger: slogtest.Make(t, nil), clock: clock, q: ch, @@ -854,6 +857,7 @@ func Test_logQueuer(t *testing.T) { clock := quartz.NewMock(t) ch := make(chan agentLog, 10) lq := &logQueuer{ + metrics: newMetricsCollector(), logger: slogtest.Make(t, nil), clock: clock, q: ch, @@ -890,6 +894,7 @@ func Test_logQueuer(t *testing.T) { IgnoreErrors: true, }) lq := &logQueuer{ + metrics: newMetricsCollector(), logger: logger, clock: clock, q: ch, @@ -1098,6 +1103,7 @@ func Test_logCache(t *testing.T) { ch := make(chan agentLog, 10) lq := &logQueuer{ + metrics: newMetricsCollector(), logger: slogtest.Make(t, nil), clock: clock, q: ch, diff --git a/main.go b/main.go index 5741ca5..d644a7b 100644 --- a/main.go +++ b/main.go @@ -126,6 +126,7 @@ func root() *serpent.Command { } logger := slog.Make(sloghuman.Sink(inv.Stderr)).Leveled(slog.LevelDebug) + metrics := newMetricsCollector() reporter, err := newPodEventLogger(inv.Context(), podEventLoggerOptions{ coderURL: parsedURL, @@ -135,6 +136,7 @@ func root() *serpent.Command { labelSelector: labelSelector, logger: logger, maxRetries: 15, // 15 retries is the default max retries for a log send failure. + metrics: metrics, }) if err != nil { return fmt.Errorf("create pod event reporter: %w", err) @@ -145,7 +147,7 @@ func root() *serpent.Command { if metricsAddr != "" { mux := http.NewServeMux() - mux.Handle("/metrics", metricsHandler()) + mux.Handle("/metrics", metrics.handler()) go func() { if err := http.ListenAndServe(metricsAddr, mux); err != nil { logger.Error(inv.Context(), "metrics server failed", slog.Error(err)) diff --git a/metrics.go b/metrics.go index 64dab50..d023f8d 100644 --- a/metrics.go +++ b/metrics.go @@ -12,33 +12,57 @@ import ( "storj.io/drpc" ) -var ( - requestsTotal = prometheus.NewCounterVec(prometheus.CounterOpts{ +// requestMethod identifies the API method being called. +type requestMethod string + +const ( + methodPostLogSource requestMethod = "PostLogSource" + methodConnectRPC requestMethod = "ConnectRPC" + methodSendLog requestMethod = "SendLog" +) + +// allMethods is used to pre-initialize all label combinations. +var allMethods = []requestMethod{methodPostLogSource, methodConnectRPC, methodSendLog} + +// metricsCollector holds Prometheus metrics for the application. +// It uses a custom registry so metrics are not global, making +// tests deterministic and avoiding flakes from parallel execution. +type metricsCollector struct { + registry *prometheus.Registry + requestsTotal *prometheus.CounterVec +} + +func newMetricsCollector() *metricsCollector { + requestsTotal := prometheus.NewCounterVec(prometheus.CounterOpts{ Name: "coder_logstream_requests_total", Help: "Total number of requests to the Coder API.", }, []string{"method", "status"}) -) -func init() { - prometheus.MustRegister(requestsTotal) + registry := prometheus.NewRegistry() + registry.MustRegister(requestsTotal) + + // Initialize all label combinations so they appear in /metrics at zero. + for _, method := range allMethods { + requestsTotal.WithLabelValues(string(method), "success") + requestsTotal.WithLabelValues(string(method), "failure") + } - // Initialize label combinations so they appear in /metrics at zero. - for _, method := range []string{"PostLogSource", "ConnectRPC", "SendLog"} { - requestsTotal.WithLabelValues(method, "success") - requestsTotal.WithLabelValues(method, "failure") + return &metricsCollector{ + registry: registry, + requestsTotal: requestsTotal, } } -func metricsHandler() http.Handler { - return promhttp.Handler() +func (m *metricsCollector) handler() http.Handler { + return promhttp.HandlerFor(m.registry, promhttp.HandlerOpts{}) } -// record is a helper that increments the appropriate request counter. -func record(method string, err error) { +// record increments the appropriate request counter. +func (m *metricsCollector) record(method requestMethod, err error) { if err != nil { - requestsTotal.WithLabelValues(method, "failure").Inc() + m.requestsTotal.WithLabelValues(string(method), "failure").Inc() } else { - requestsTotal.WithLabelValues(method, "success").Inc() + m.requestsTotal.WithLabelValues(string(method), "success").Inc() } } @@ -47,17 +71,19 @@ func record(method string, err error) { // rather than scattered across call sites. type instrumentedClient struct { *agentsdk.Client + metrics *metricsCollector } -func newInstrumentedClient(coderURL *url.URL, token string) *instrumentedClient { +func newInstrumentedClient(coderURL *url.URL, token string, metrics *metricsCollector) *instrumentedClient { return &instrumentedClient{ - Client: agentsdk.New(coderURL, agentsdk.WithFixedToken(token)), + Client: agentsdk.New(coderURL, agentsdk.WithFixedToken(token)), + metrics: metrics, } } func (c *instrumentedClient) PostLogSource(ctx context.Context, req agentsdk.PostLogSourceRequest) (codersdk.WorkspaceAgentLogSource, error) { resp, err := c.Client.PostLogSource(ctx, req) - record("PostLogSource", err) + c.metrics.record(methodPostLogSource, err) return resp, err } @@ -66,21 +92,16 @@ func (c *instrumentedClient) PostLogSource(ctx context.Context, req agentsdk.Pos func (c *instrumentedClient) connectLogDest(ctx context.Context, supportsRole bool) (agentsdk.LogDest, drpc.Conn, error) { if supportsRole { arpc, _, err := c.ConnectRPC28WithRole(ctx, "logstream-kube") - record("ConnectRPC", err) + c.metrics.record(methodConnectRPC, err) if err != nil { return nil, nil, err } return arpc, arpc.DRPCConn(), nil } arpc, err := c.ConnectRPC20(ctx) - record("ConnectRPC", err) + c.metrics.record(methodConnectRPC, err) if err != nil { return nil, nil, err } return arpc, arpc.DRPCConn(), nil } - -// recordSendResult records the result of a log send operation. -func recordSendResult(err error) { - record("SendLog", err) -} diff --git a/metrics_test.go b/metrics_test.go index 5049ea8..5bd8548 100644 --- a/metrics_test.go +++ b/metrics_test.go @@ -25,35 +25,39 @@ func getCounterValue(t *testing.T, cv *prometheus.CounterVec, labels ...string) func TestMetricsIncrement(t *testing.T) { t.Parallel() - // Record baseline values (metrics are global and may have been - // incremented by other tests running in the same process). - baseSuccess := getCounterValue(t, requestsTotal, "PostLogSource", "success") - baseFailure := getCounterValue(t, requestsTotal, "PostLogSource", "failure") - baseSendSuccess := getCounterValue(t, requestsTotal, "SendLog", "success") + m := newMetricsCollector() - // Simulate success via record helper - record("PostLogSource", nil) - require.Equal(t, baseSuccess+1, getCounterValue(t, requestsTotal, "PostLogSource", "success")) + // All counters start at zero. + require.Equal(t, float64(0), getCounterValue(t, m.requestsTotal, "PostLogSource", "success")) + require.Equal(t, float64(0), getCounterValue(t, m.requestsTotal, "PostLogSource", "failure")) + require.Equal(t, float64(0), getCounterValue(t, m.requestsTotal, "SendLog", "success")) - // Simulate failure via record helper - record("PostLogSource", io.ErrUnexpectedEOF) - require.Equal(t, baseFailure+1, getCounterValue(t, requestsTotal, "PostLogSource", "failure")) + // Simulate success + m.record(methodPostLogSource, nil) + require.Equal(t, float64(1), getCounterValue(t, m.requestsTotal, "PostLogSource", "success")) + + // Simulate failure + m.record(methodPostLogSource, io.ErrUnexpectedEOF) + require.Equal(t, float64(1), getCounterValue(t, m.requestsTotal, "PostLogSource", "failure")) // Simulate send success - recordSendResult(nil) - require.Equal(t, baseSendSuccess+1, getCounterValue(t, requestsTotal, "SendLog", "success")) + m.record(methodSendLog, nil) + require.Equal(t, float64(1), getCounterValue(t, m.requestsTotal, "SendLog", "success")) } func TestMetricsHandler(t *testing.T) { t.Parallel() - handler := metricsHandler() + m := newMetricsCollector() + handler := m.handler() require.NotNil(t, handler) } func TestMetricsEndpoint(t *testing.T) { t.Parallel() + m := newMetricsCollector() + // Pick a random free port. listener, err := net.Listen("tcp", "127.0.0.1:0") require.NoError(t, err) @@ -61,7 +65,7 @@ func TestMetricsEndpoint(t *testing.T) { _ = listener.Close() mux := http.NewServeMux() - mux.Handle("/metrics", metricsHandler()) + mux.Handle("/metrics", m.handler()) srv := &http.Server{Addr: addr, Handler: mux} go func() { _ = srv.ListenAndServe() }() t.Cleanup(func() { _ = srv.Close() }) @@ -77,7 +81,7 @@ func TestMetricsEndpoint(t *testing.T) { }, 2*time.Second, 50*time.Millisecond) // Bump a counter and verify it appears in the output. - record("PostLogSource", nil) + m.record(methodPostLogSource, nil) resp, err := http.Get("http://" + addr + "/metrics") require.NoError(t, err) From f4b1ebee14f7de3411a13f187614e367f6abb823 Mon Sep 17 00:00:00 2001 From: Kacper Sawicki Date: Wed, 4 Mar 2026 09:50:36 +0000 Subject: [PATCH 9/9] fix: goimports formatting in logger_test.go --- logger_test.go | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/logger_test.go b/logger_test.go index ee6c469..92bd994 100644 --- a/logger_test.go +++ b/logger_test.go @@ -662,7 +662,7 @@ func Test_logQueuer(t *testing.T) { ch := make(chan agentLog) lq := &logQueuer{ - metrics: newMetricsCollector(), + metrics: newMetricsCollector(), logger: slogtest.Make(t, nil), clock: clock, q: ch, @@ -729,7 +729,7 @@ func Test_logQueuer(t *testing.T) { IgnoreErrors: true, }) lq := &logQueuer{ - metrics: newMetricsCollector(), + metrics: newMetricsCollector(), logger: logger, clock: clock, q: ch, @@ -818,9 +818,9 @@ func Test_logQueuer(t *testing.T) { ch := make(chan agentLog, 10) lq := &logQueuer{ metrics: newMetricsCollector(), - logger: slogtest.Make(t, nil), - clock: clock, - q: ch, + logger: slogtest.Make(t, nil), + clock: clock, + q: ch, logCache: logCache{ logs: map[string][]agentsdk.Log{}, }, @@ -858,9 +858,9 @@ func Test_logQueuer(t *testing.T) { ch := make(chan agentLog, 10) lq := &logQueuer{ metrics: newMetricsCollector(), - logger: slogtest.Make(t, nil), - clock: clock, - q: ch, + logger: slogtest.Make(t, nil), + clock: clock, + q: ch, logCache: logCache{ logs: map[string][]agentsdk.Log{}, }, @@ -894,7 +894,7 @@ func Test_logQueuer(t *testing.T) { IgnoreErrors: true, }) lq := &logQueuer{ - metrics: newMetricsCollector(), + metrics: newMetricsCollector(), logger: logger, clock: clock, q: ch, @@ -1103,7 +1103,7 @@ func Test_logCache(t *testing.T) { ch := make(chan agentLog, 10) lq := &logQueuer{ - metrics: newMetricsCollector(), + metrics: newMetricsCollector(), logger: slogtest.Make(t, nil), clock: clock, q: ch,