Skip to content
Merged
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ require (
github.com/go-chi/chi/v5 v5.2.2
github.com/google/uuid v1.6.0
github.com/hashicorp/yamux v0.1.2
github.com/prometheus/client_golang v1.23.2
github.com/prometheus/client_model v0.6.2
github.com/stretchr/testify v1.11.1
golang.org/x/mod v0.33.0
google.golang.org/protobuf v1.36.11
Expand Down Expand Up @@ -177,8 +179,6 @@ require (
github.com/planetscale/vtprotobuf v0.6.1-0.20240319094008-0393e58bdf10 // indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
github.com/power-devops/perfstat v0.0.0-20240221224432-82ca36839d55 // indirect
github.com/prometheus/client_golang v1.23.2 // indirect
github.com/prometheus/client_model v0.6.2 // indirect
github.com/prometheus/common v0.67.4 // indirect
github.com/prometheus/procfs v0.16.1 // indirect
github.com/puzpuzpuz/xsync/v3 v3.5.1 // indirect
Expand Down
19 changes: 19 additions & 0 deletions helm/templates/service.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,12 @@ spec:
{{- with .Values.labels }}
{{- toYaml . | nindent 8 }}
{{- end }}
{{- if .Values.metrics.enabled }}
annotations:
prometheus.io/scrape: "true"
prometheus.io/port: {{ .Values.metrics.port | quote }}
prometheus.io/path: "/metrics"
{{- end }}
spec:
serviceAccountName: {{ .Values.serviceAccount.name | quote }}
restartPolicy: Always
Expand Down Expand Up @@ -111,10 +117,23 @@ spec:
imagePullPolicy: {{ .Values.image.pullPolicy }}
command:
- /coder-logstream-kube
{{- if .Values.metrics.enabled }}
ports:
- name: metrics
containerPort: {{ .Values.metrics.port }}
protocol: TCP
{{- end }}
resources: {{ toYaml .Values.resources | nindent 12 }}
env:
- name: CODER_URL
value: {{ .Values.url }}
{{- if .Values.metrics.enabled }}
- name: CODER_LOGSTREAM_METRICS_ADDR
value: ":{{ .Values.metrics.port }}"
{{- else }}
- name: CODER_LOGSTREAM_METRICS_ADDR
value: ""
{{- end }}
{{- if .Values.namespaces }}
- name: CODER_NAMESPACES
value: {{ join "," .Values.namespaces }}
Expand Down
7 changes: 7 additions & 0 deletions helm/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,13 @@ tolerations:
# value: "value"
# effect: "NoSchedule"

# metrics -- Prometheus metrics configuration.
metrics:
# metrics.enabled -- Whether to expose a Prometheus /metrics endpoint.
enabled: false
# metrics.port -- The port to serve Prometheus metrics on.
port: 9100

# labels -- The pod labels for coder-logstream-kube. See:
# https://kubernetes.io/docs/concepts/overview/working-with-objects/labels/
labels: {}
Expand Down
42 changes: 17 additions & 25 deletions logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (

"cdr.dev/slog/v3"
"golang.org/x/mod/semver"
"storj.io/drpc"

"github.com/coder/coder/v2/codersdk"
"github.com/coder/coder/v2/codersdk/agentsdk"
Expand All @@ -43,6 +42,8 @@ type podEventLoggerOptions struct {
// maxRetries is the maximum number of retries for a log send failure.
maxRetries int

metrics *metricsCollector

// The following fields are optional!
namespaces []string
fieldSelector string
Expand All @@ -63,6 +64,9 @@ func newPodEventLogger(ctx context.Context, opts podEventLoggerOptions) (*podEve
if opts.maxRetries == 0 {
opts.maxRetries = 10
}
if opts.metrics == nil {
opts.metrics = newMetricsCollector()
}

logCh := make(chan agentLog, 512)
ctx, cancelFunc := context.WithCancel(ctx)
Expand All @@ -88,6 +92,7 @@ func newPodEventLogger(ctx context.Context, opts podEventLoggerOptions) (*podEve
logs: map[string][]agentsdk.Log{},
},
maxRetries: opts.maxRetries,
metrics: opts.metrics,
},
doneChan: make(chan struct{}),
}
Expand Down Expand Up @@ -520,6 +525,8 @@ type logQueuer struct {
retries map[string]*retryState
// maxRetries is the maximum number of retries for a log send failure.
maxRetries int

metrics *metricsCollector
}

func (l *logQueuer) work(ctx context.Context, done chan struct{}) {
Expand Down Expand Up @@ -557,7 +564,7 @@ func (l *logQueuer) cleanup() {
}

func (l *logQueuer) newLogger(ctx context.Context, log agentLog) (agentLoggerLifecycle, error) {
client := agentsdk.New(l.coderURL, agentsdk.WithFixedToken(log.agentToken))
client := newInstrumentedClient(l.coderURL, log.agentToken, l.metrics)

logger := l.logger.With(slog.F("resource_name", log.resourceName))
client.SDK.SetLogger(logger)
Expand Down Expand Up @@ -590,28 +597,11 @@ func (l *logQueuer) newLogger(ctx context.Context, log agentLog) (agentLoggerLif
}
supportsRole := buildInfoErr == nil && versionAtLeast(buildInfo.Version, "v2.31.0")

var (
logDest agentsdk.LogDest
rpcConn drpc.Conn
)
if supportsRole {
arpc, _, err := client.ConnectRPC28WithRole(gracefulCtx, "logstream-kube")
if err != nil {
logger.Error(ctx, "drpc connect with role", slog.Error(err))
gracefulCancel()
return agentLoggerLifecycle{}, err
}
logDest = arpc
rpcConn = arpc.DRPCConn()
} else {
arpc, err := client.ConnectRPC20(gracefulCtx)
if err != nil {
logger.Error(ctx, "drpc connect", slog.Error(err))
gracefulCancel()
return agentLoggerLifecycle{}, err
}
logDest = arpc
rpcConn = arpc.DRPCConn()
logDest, rpcConn, err := client.connectLogDest(gracefulCtx, supportsRole)
if err != nil {
logger.Error(ctx, "drpc connect", slog.Error(err))
gracefulCancel()
return agentLoggerLifecycle{}, err
}
go func() {
err := ls.SendLoop(gracefulCtx, logDest)
Expand Down Expand Up @@ -690,7 +680,9 @@ func (l *logQueuer) processLog(ctx context.Context, log agentLog) {
if len(queuedLogs) == 0 {
return
}
if err := lgr.scriptLogger.Send(ctx, queuedLogs...); err != nil {
sendErr := lgr.scriptLogger.Send(ctx, queuedLogs...)
l.metrics.record(methodSendLog, sendErr)
if sendErr != nil {
l.scheduleRetry(ctx, log.agentToken)
return
}
Expand Down
18 changes: 12 additions & 6 deletions logger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -662,6 +662,7 @@ func Test_logQueuer(t *testing.T) {

ch := make(chan agentLog)
lq := &logQueuer{
metrics: newMetricsCollector(),
logger: slogtest.Make(t, nil),
clock: clock,
q: ch,
Expand Down Expand Up @@ -728,6 +729,7 @@ func Test_logQueuer(t *testing.T) {
IgnoreErrors: true,
})
lq := &logQueuer{
metrics: newMetricsCollector(),
logger: logger,
clock: clock,
q: ch,
Expand Down Expand Up @@ -815,9 +817,10 @@ func Test_logQueuer(t *testing.T) {
clock := quartz.NewMock(t)
ch := make(chan agentLog, 10)
lq := &logQueuer{
logger: slogtest.Make(t, nil),
clock: clock,
q: ch,
metrics: newMetricsCollector(),
logger: slogtest.Make(t, nil),
clock: clock,
q: ch,
logCache: logCache{
logs: map[string][]agentsdk.Log{},
},
Expand Down Expand Up @@ -854,9 +857,10 @@ func Test_logQueuer(t *testing.T) {
clock := quartz.NewMock(t)
ch := make(chan agentLog, 10)
lq := &logQueuer{
logger: slogtest.Make(t, nil),
clock: clock,
q: ch,
metrics: newMetricsCollector(),
logger: slogtest.Make(t, nil),
clock: clock,
q: ch,
logCache: logCache{
logs: map[string][]agentsdk.Log{},
},
Expand Down Expand Up @@ -890,6 +894,7 @@ func Test_logQueuer(t *testing.T) {
IgnoreErrors: true,
})
lq := &logQueuer{
metrics: newMetricsCollector(),
logger: logger,
clock: clock,
q: ch,
Expand Down Expand Up @@ -1098,6 +1103,7 @@ func Test_logCache(t *testing.T) {

ch := make(chan agentLog, 10)
lq := &logQueuer{
metrics: newMetricsCollector(),
logger: slogtest.Make(t, nil),
clock: clock,
q: ch,
Expand Down
27 changes: 26 additions & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package main
import (
"errors"
"fmt"
"net/http"
"net/url"
"os"
"strings"
Expand Down Expand Up @@ -30,6 +31,7 @@ func root() *serpent.Command {
kubeConfig string
namespacesStr string
labelSelector string
metricsAddr string
)
cmd := &serpent.Command{
Use: "coder-logstream-kube",
Expand Down Expand Up @@ -73,6 +75,14 @@ func root() *serpent.Command {
Value: serpent.StringOf(&labelSelector),
Description: "Label selector to use when listing pods.",
},
{
Name: "metrics-addr",
Flag: "metrics-addr",
Env: "CODER_LOGSTREAM_METRICS_ADDR",
Default: "",
Value: serpent.StringOf(&metricsAddr),
Description: "Address to serve Prometheus metrics on. Set to empty to disable.",
},
},
Handler: func(inv *serpent.Invocation) error {
if coderURL == "" {
Expand Down Expand Up @@ -115,21 +125,36 @@ func root() *serpent.Command {
}
}

logger := slog.Make(sloghuman.Sink(inv.Stderr)).Leveled(slog.LevelDebug)
metrics := newMetricsCollector()

reporter, err := newPodEventLogger(inv.Context(), podEventLoggerOptions{
coderURL: parsedURL,
client: client,
namespaces: namespaces,
fieldSelector: fieldSelector,
labelSelector: labelSelector,
logger: slog.Make(sloghuman.Sink(inv.Stderr)).Leveled(slog.LevelDebug),
logger: logger,
maxRetries: 15, // 15 retries is the default max retries for a log send failure.
metrics: metrics,
})
if err != nil {
return fmt.Errorf("create pod event reporter: %w", err)
}
defer func() {
_ = reporter.Close()
}()

if metricsAddr != "" {
mux := http.NewServeMux()
mux.Handle("/metrics", metrics.handler())
go func() {
if err := http.ListenAndServe(metricsAddr, mux); err != nil {
logger.Error(inv.Context(), "metrics server failed", slog.Error(err))
}
}()
}

select {
case err := <-reporter.errChan:
return fmt.Errorf("pod event reporter: %w", err)
Expand Down
Loading