diff --git a/go.mod b/go.mod index 14a2e20..9224526 100644 --- a/go.mod +++ b/go.mod @@ -16,6 +16,8 @@ require ( github.com/go-chi/chi/v5 v5.2.2 github.com/google/uuid v1.6.0 github.com/hashicorp/yamux v0.1.2 + github.com/prometheus/client_golang v1.23.2 + github.com/prometheus/client_model v0.6.2 github.com/stretchr/testify v1.11.1 golang.org/x/mod v0.33.0 google.golang.org/protobuf v1.36.11 @@ -177,8 +179,6 @@ require ( github.com/planetscale/vtprotobuf v0.6.1-0.20240319094008-0393e58bdf10 // indirect github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect github.com/power-devops/perfstat v0.0.0-20240221224432-82ca36839d55 // indirect - github.com/prometheus/client_golang v1.23.2 // indirect - github.com/prometheus/client_model v0.6.2 // indirect github.com/prometheus/common v0.67.4 // indirect github.com/prometheus/procfs v0.16.1 // indirect github.com/puzpuzpuz/xsync/v3 v3.5.1 // indirect diff --git a/helm/templates/service.yaml b/helm/templates/service.yaml index 23473f5..eb10b23 100644 --- a/helm/templates/service.yaml +++ b/helm/templates/service.yaml @@ -82,6 +82,12 @@ spec: {{- with .Values.labels }} {{- toYaml . | nindent 8 }} {{- end }} + {{- if .Values.metrics.enabled }} + annotations: + prometheus.io/scrape: "true" + prometheus.io/port: {{ .Values.metrics.port | quote }} + prometheus.io/path: "/metrics" + {{- end }} spec: serviceAccountName: {{ .Values.serviceAccount.name | quote }} restartPolicy: Always @@ -111,10 +117,23 @@ spec: imagePullPolicy: {{ .Values.image.pullPolicy }} command: - /coder-logstream-kube + {{- if .Values.metrics.enabled }} + ports: + - name: metrics + containerPort: {{ .Values.metrics.port }} + protocol: TCP + {{- end }} resources: {{ toYaml .Values.resources | nindent 12 }} env: - name: CODER_URL value: {{ .Values.url }} + {{- if .Values.metrics.enabled }} + - name: CODER_LOGSTREAM_METRICS_ADDR + value: ":{{ .Values.metrics.port }}" + {{- else }} + - name: CODER_LOGSTREAM_METRICS_ADDR + value: "" + {{- end }} {{- if .Values.namespaces }} - name: CODER_NAMESPACES value: {{ join "," .Values.namespaces }} diff --git a/helm/values.yaml b/helm/values.yaml index daa847f..e667225 100644 --- a/helm/values.yaml +++ b/helm/values.yaml @@ -86,6 +86,13 @@ tolerations: # value: "value" # effect: "NoSchedule" +# metrics -- Prometheus metrics configuration. +metrics: + # metrics.enabled -- Whether to expose a Prometheus /metrics endpoint. + enabled: false + # metrics.port -- The port to serve Prometheus metrics on. + port: 9100 + # labels -- The pod labels for coder-logstream-kube. See: # https://kubernetes.io/docs/concepts/overview/working-with-objects/labels/ labels: {} diff --git a/logger.go b/logger.go index 0d0a7a4..8cbd050 100644 --- a/logger.go +++ b/logger.go @@ -21,7 +21,6 @@ import ( "cdr.dev/slog/v3" "golang.org/x/mod/semver" - "storj.io/drpc" "github.com/coder/coder/v2/codersdk" "github.com/coder/coder/v2/codersdk/agentsdk" @@ -43,6 +42,8 @@ type podEventLoggerOptions struct { // maxRetries is the maximum number of retries for a log send failure. maxRetries int + metrics *metricsCollector + // The following fields are optional! namespaces []string fieldSelector string @@ -63,6 +64,9 @@ func newPodEventLogger(ctx context.Context, opts podEventLoggerOptions) (*podEve if opts.maxRetries == 0 { opts.maxRetries = 10 } + if opts.metrics == nil { + opts.metrics = newMetricsCollector() + } logCh := make(chan agentLog, 512) ctx, cancelFunc := context.WithCancel(ctx) @@ -88,6 +92,7 @@ func newPodEventLogger(ctx context.Context, opts podEventLoggerOptions) (*podEve logs: map[string][]agentsdk.Log{}, }, maxRetries: opts.maxRetries, + metrics: opts.metrics, }, doneChan: make(chan struct{}), } @@ -520,6 +525,8 @@ type logQueuer struct { retries map[string]*retryState // maxRetries is the maximum number of retries for a log send failure. maxRetries int + + metrics *metricsCollector } func (l *logQueuer) work(ctx context.Context, done chan struct{}) { @@ -557,7 +564,7 @@ func (l *logQueuer) cleanup() { } func (l *logQueuer) newLogger(ctx context.Context, log agentLog) (agentLoggerLifecycle, error) { - client := agentsdk.New(l.coderURL, agentsdk.WithFixedToken(log.agentToken)) + client := newInstrumentedClient(l.coderURL, log.agentToken, l.metrics) logger := l.logger.With(slog.F("resource_name", log.resourceName)) client.SDK.SetLogger(logger) @@ -590,28 +597,11 @@ func (l *logQueuer) newLogger(ctx context.Context, log agentLog) (agentLoggerLif } supportsRole := buildInfoErr == nil && versionAtLeast(buildInfo.Version, "v2.31.0") - var ( - logDest agentsdk.LogDest - rpcConn drpc.Conn - ) - if supportsRole { - arpc, _, err := client.ConnectRPC28WithRole(gracefulCtx, "logstream-kube") - if err != nil { - logger.Error(ctx, "drpc connect with role", slog.Error(err)) - gracefulCancel() - return agentLoggerLifecycle{}, err - } - logDest = arpc - rpcConn = arpc.DRPCConn() - } else { - arpc, err := client.ConnectRPC20(gracefulCtx) - if err != nil { - logger.Error(ctx, "drpc connect", slog.Error(err)) - gracefulCancel() - return agentLoggerLifecycle{}, err - } - logDest = arpc - rpcConn = arpc.DRPCConn() + logDest, rpcConn, err := client.connectLogDest(gracefulCtx, supportsRole) + if err != nil { + logger.Error(ctx, "drpc connect", slog.Error(err)) + gracefulCancel() + return agentLoggerLifecycle{}, err } go func() { err := ls.SendLoop(gracefulCtx, logDest) @@ -690,7 +680,9 @@ func (l *logQueuer) processLog(ctx context.Context, log agentLog) { if len(queuedLogs) == 0 { return } - if err := lgr.scriptLogger.Send(ctx, queuedLogs...); err != nil { + sendErr := lgr.scriptLogger.Send(ctx, queuedLogs...) + l.metrics.record(methodSendLog, sendErr) + if sendErr != nil { l.scheduleRetry(ctx, log.agentToken) return } diff --git a/logger_test.go b/logger_test.go index 8a76de5..92bd994 100644 --- a/logger_test.go +++ b/logger_test.go @@ -662,6 +662,7 @@ func Test_logQueuer(t *testing.T) { ch := make(chan agentLog) lq := &logQueuer{ + metrics: newMetricsCollector(), logger: slogtest.Make(t, nil), clock: clock, q: ch, @@ -728,6 +729,7 @@ func Test_logQueuer(t *testing.T) { IgnoreErrors: true, }) lq := &logQueuer{ + metrics: newMetricsCollector(), logger: logger, clock: clock, q: ch, @@ -815,9 +817,10 @@ func Test_logQueuer(t *testing.T) { clock := quartz.NewMock(t) ch := make(chan agentLog, 10) lq := &logQueuer{ - logger: slogtest.Make(t, nil), - clock: clock, - q: ch, + metrics: newMetricsCollector(), + logger: slogtest.Make(t, nil), + clock: clock, + q: ch, logCache: logCache{ logs: map[string][]agentsdk.Log{}, }, @@ -854,9 +857,10 @@ func Test_logQueuer(t *testing.T) { clock := quartz.NewMock(t) ch := make(chan agentLog, 10) lq := &logQueuer{ - logger: slogtest.Make(t, nil), - clock: clock, - q: ch, + metrics: newMetricsCollector(), + logger: slogtest.Make(t, nil), + clock: clock, + q: ch, logCache: logCache{ logs: map[string][]agentsdk.Log{}, }, @@ -890,6 +894,7 @@ func Test_logQueuer(t *testing.T) { IgnoreErrors: true, }) lq := &logQueuer{ + metrics: newMetricsCollector(), logger: logger, clock: clock, q: ch, @@ -1098,6 +1103,7 @@ func Test_logCache(t *testing.T) { ch := make(chan agentLog, 10) lq := &logQueuer{ + metrics: newMetricsCollector(), logger: slogtest.Make(t, nil), clock: clock, q: ch, diff --git a/main.go b/main.go index 974ad0f..d644a7b 100644 --- a/main.go +++ b/main.go @@ -3,6 +3,7 @@ package main import ( "errors" "fmt" + "net/http" "net/url" "os" "strings" @@ -30,6 +31,7 @@ func root() *serpent.Command { kubeConfig string namespacesStr string labelSelector string + metricsAddr string ) cmd := &serpent.Command{ Use: "coder-logstream-kube", @@ -73,6 +75,14 @@ func root() *serpent.Command { Value: serpent.StringOf(&labelSelector), Description: "Label selector to use when listing pods.", }, + { + Name: "metrics-addr", + Flag: "metrics-addr", + Env: "CODER_LOGSTREAM_METRICS_ADDR", + Default: "", + Value: serpent.StringOf(&metricsAddr), + Description: "Address to serve Prometheus metrics on. Set to empty to disable.", + }, }, Handler: func(inv *serpent.Invocation) error { if coderURL == "" { @@ -115,14 +125,18 @@ func root() *serpent.Command { } } + logger := slog.Make(sloghuman.Sink(inv.Stderr)).Leveled(slog.LevelDebug) + metrics := newMetricsCollector() + reporter, err := newPodEventLogger(inv.Context(), podEventLoggerOptions{ coderURL: parsedURL, client: client, namespaces: namespaces, fieldSelector: fieldSelector, labelSelector: labelSelector, - logger: slog.Make(sloghuman.Sink(inv.Stderr)).Leveled(slog.LevelDebug), + logger: logger, maxRetries: 15, // 15 retries is the default max retries for a log send failure. + metrics: metrics, }) if err != nil { return fmt.Errorf("create pod event reporter: %w", err) @@ -130,6 +144,17 @@ func root() *serpent.Command { defer func() { _ = reporter.Close() }() + + if metricsAddr != "" { + mux := http.NewServeMux() + mux.Handle("/metrics", metrics.handler()) + go func() { + if err := http.ListenAndServe(metricsAddr, mux); err != nil { + logger.Error(inv.Context(), "metrics server failed", slog.Error(err)) + } + }() + } + select { case err := <-reporter.errChan: return fmt.Errorf("pod event reporter: %w", err) diff --git a/metrics.go b/metrics.go new file mode 100644 index 0000000..d023f8d --- /dev/null +++ b/metrics.go @@ -0,0 +1,107 @@ +package main + +import ( + "context" + "net/http" + "net/url" + + "github.com/coder/coder/v2/codersdk" + "github.com/coder/coder/v2/codersdk/agentsdk" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promhttp" + "storj.io/drpc" +) + +// requestMethod identifies the API method being called. +type requestMethod string + +const ( + methodPostLogSource requestMethod = "PostLogSource" + methodConnectRPC requestMethod = "ConnectRPC" + methodSendLog requestMethod = "SendLog" +) + +// allMethods is used to pre-initialize all label combinations. +var allMethods = []requestMethod{methodPostLogSource, methodConnectRPC, methodSendLog} + +// metricsCollector holds Prometheus metrics for the application. +// It uses a custom registry so metrics are not global, making +// tests deterministic and avoiding flakes from parallel execution. +type metricsCollector struct { + registry *prometheus.Registry + requestsTotal *prometheus.CounterVec +} + +func newMetricsCollector() *metricsCollector { + requestsTotal := prometheus.NewCounterVec(prometheus.CounterOpts{ + Name: "coder_logstream_requests_total", + Help: "Total number of requests to the Coder API.", + }, []string{"method", "status"}) + + registry := prometheus.NewRegistry() + registry.MustRegister(requestsTotal) + + // Initialize all label combinations so they appear in /metrics at zero. + for _, method := range allMethods { + requestsTotal.WithLabelValues(string(method), "success") + requestsTotal.WithLabelValues(string(method), "failure") + } + + return &metricsCollector{ + registry: registry, + requestsTotal: requestsTotal, + } +} + +func (m *metricsCollector) handler() http.Handler { + return promhttp.HandlerFor(m.registry, promhttp.HandlerOpts{}) +} + +// record increments the appropriate request counter. +func (m *metricsCollector) record(method requestMethod, err error) { + if err != nil { + m.requestsTotal.WithLabelValues(string(method), "failure").Inc() + } else { + m.requestsTotal.WithLabelValues(string(method), "success").Inc() + } +} + +// instrumentedClient wraps agentsdk.Client to record Prometheus metrics +// on every API call. This keeps metric instrumentation in one place +// rather than scattered across call sites. +type instrumentedClient struct { + *agentsdk.Client + metrics *metricsCollector +} + +func newInstrumentedClient(coderURL *url.URL, token string, metrics *metricsCollector) *instrumentedClient { + return &instrumentedClient{ + Client: agentsdk.New(coderURL, agentsdk.WithFixedToken(token)), + metrics: metrics, + } +} + +func (c *instrumentedClient) PostLogSource(ctx context.Context, req agentsdk.PostLogSourceRequest) (codersdk.WorkspaceAgentLogSource, error) { + resp, err := c.Client.PostLogSource(ctx, req) + c.metrics.record(methodPostLogSource, err) + return resp, err +} + +// connectLogDest establishes the appropriate RPC connection based on +// server capabilities, recording metrics for the attempt. +func (c *instrumentedClient) connectLogDest(ctx context.Context, supportsRole bool) (agentsdk.LogDest, drpc.Conn, error) { + if supportsRole { + arpc, _, err := c.ConnectRPC28WithRole(ctx, "logstream-kube") + c.metrics.record(methodConnectRPC, err) + if err != nil { + return nil, nil, err + } + return arpc, arpc.DRPCConn(), nil + } + arpc, err := c.ConnectRPC20(ctx) + c.metrics.record(methodConnectRPC, err) + if err != nil { + return nil, nil, err + } + return arpc, arpc.DRPCConn(), nil +} diff --git a/metrics_test.go b/metrics_test.go new file mode 100644 index 0000000..5bd8548 --- /dev/null +++ b/metrics_test.go @@ -0,0 +1,95 @@ +package main + +import ( + "io" + "net" + "net/http" + "strings" + "testing" + "time" + + "github.com/prometheus/client_golang/prometheus" + dto "github.com/prometheus/client_model/go" + "github.com/stretchr/testify/require" +) + +func getCounterValue(t *testing.T, cv *prometheus.CounterVec, labels ...string) float64 { + t.Helper() + m := &dto.Metric{} + c, err := cv.GetMetricWithLabelValues(labels...) + require.NoError(t, err) + require.NoError(t, c.Write(m)) + return m.GetCounter().GetValue() +} + +func TestMetricsIncrement(t *testing.T) { + t.Parallel() + + m := newMetricsCollector() + + // All counters start at zero. + require.Equal(t, float64(0), getCounterValue(t, m.requestsTotal, "PostLogSource", "success")) + require.Equal(t, float64(0), getCounterValue(t, m.requestsTotal, "PostLogSource", "failure")) + require.Equal(t, float64(0), getCounterValue(t, m.requestsTotal, "SendLog", "success")) + + // Simulate success + m.record(methodPostLogSource, nil) + require.Equal(t, float64(1), getCounterValue(t, m.requestsTotal, "PostLogSource", "success")) + + // Simulate failure + m.record(methodPostLogSource, io.ErrUnexpectedEOF) + require.Equal(t, float64(1), getCounterValue(t, m.requestsTotal, "PostLogSource", "failure")) + + // Simulate send success + m.record(methodSendLog, nil) + require.Equal(t, float64(1), getCounterValue(t, m.requestsTotal, "SendLog", "success")) +} + +func TestMetricsHandler(t *testing.T) { + t.Parallel() + + m := newMetricsCollector() + handler := m.handler() + require.NotNil(t, handler) +} + +func TestMetricsEndpoint(t *testing.T) { + t.Parallel() + + m := newMetricsCollector() + + // Pick a random free port. + listener, err := net.Listen("tcp", "127.0.0.1:0") + require.NoError(t, err) + addr := listener.Addr().String() + _ = listener.Close() + + mux := http.NewServeMux() + mux.Handle("/metrics", m.handler()) + srv := &http.Server{Addr: addr, Handler: mux} + go func() { _ = srv.ListenAndServe() }() + t.Cleanup(func() { _ = srv.Close() }) + + // Wait for the server to be ready. + require.Eventually(t, func() bool { + resp, err := http.Get("http://" + addr + "/metrics") + if err != nil { + return false + } + _ = resp.Body.Close() + return resp.StatusCode == http.StatusOK + }, 2*time.Second, 50*time.Millisecond) + + // Bump a counter and verify it appears in the output. + m.record(methodPostLogSource, nil) + + resp, err := http.Get("http://" + addr + "/metrics") + require.NoError(t, err) + defer func() { _ = resp.Body.Close() }() + + body, err := io.ReadAll(resp.Body) + require.NoError(t, err) + + require.True(t, strings.Contains(string(body), "coder_logstream_requests_total"), + "expected coder_logstream_requests_total in metrics output") +}