diff --git a/cmd/activator/main.go b/cmd/activator/main.go index 8593ccbed5dd..b6e2cd963e8c 100644 --- a/cmd/activator/main.go +++ b/cmd/activator/main.go @@ -205,7 +205,8 @@ func main() { logger.Info("Connecting to Autoscaler at ", autoscalerEndpoint) statSink := websocket.NewDurableSendingConnection(autoscalerEndpoint, logger) defer statSink.Shutdown() - go activator.ReportStats(logger, statSink, statCh) + go activator.ReportStats(logger, statSink, statCh, mp) + go activator.AutoscalerConnectionStatusMonitor(ctx, logger, statSink, mp) // Create and run our concurrency reporter concurrencyReporter := activatorhandler.NewConcurrencyReporter(ctx, env.PodName, statCh, mp) diff --git a/pkg/activator/metrics.go b/pkg/activator/metrics.go new file mode 100644 index 000000000000..3571462c752e --- /dev/null +++ b/pkg/activator/metrics.go @@ -0,0 +1,53 @@ +/* +Copyright 2024 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package activator + +import ( + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/metric" +) + +var scopeName = "knative.dev/serving/pkg/activator" + +type statReporterMetrics struct { + autoscalerReachable metric.Int64Gauge +} + +func newStatReporterMetrics(mp metric.MeterProvider) *statReporterMetrics { + var ( + m statReporterMetrics + err error + provider = mp + ) + + if provider == nil { + provider = otel.GetMeterProvider() + } + + meter := provider.Meter(scopeName) + + m.autoscalerReachable, err = meter.Int64Gauge( + "kn.activator.autoscaler.reachable", + metric.WithDescription("Whether the autoscaler is reachable from the activator (1 = reachable, 0 = not reachable)"), + metric.WithUnit("{reachable}"), + ) + if err != nil { + panic(err) + } + + return &m +} diff --git a/pkg/activator/stat_reporter.go b/pkg/activator/stat_reporter.go index bfd60719c955..a44894f1c823 100644 --- a/pkg/activator/stat_reporter.go +++ b/pkg/activator/stat_reporter.go @@ -17,9 +17,18 @@ limitations under the License. package activator import ( + "context" + "time" + "github.com/gorilla/websocket" + "go.opentelemetry.io/otel/metric" "go.uber.org/zap" - "knative.dev/serving/pkg/autoscaler/metrics" + asmetrics "knative.dev/serving/pkg/autoscaler/metrics" +) + +const ( + // connectionCheckInterval is how often to check the autoscaler connection status. + connectionCheckInterval = 5 * time.Second ) // RawSender sends raw byte array messages with a message type @@ -28,13 +37,42 @@ type RawSender interface { SendRaw(msgType int, msg []byte) error } +// StatusChecker checks the connection status. +type StatusChecker interface { + Status() error +} + +// AutoscalerConnectionStatusMonitor periodically checks if the autoscaler is reachable +// and emits metrics and logs accordingly. +func AutoscalerConnectionStatusMonitor(ctx context.Context, logger *zap.SugaredLogger, conn StatusChecker, mp metric.MeterProvider) { + metrics := newStatReporterMetrics(mp) + ticker := time.NewTicker(connectionCheckInterval) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + if err := conn.Status(); err != nil { + logger.Errorw("Autoscaler is not reachable from activator.", + zap.Error(err)) + metrics.autoscalerReachable.Record(context.Background(), 0) + } else { + metrics.autoscalerReachable.Record(context.Background(), 1) + } + } + } +} + // ReportStats sends any messages received on the source channel to the sink. // The messages are sent on a goroutine to avoid blocking, which means that // messages may arrive out of order. -func ReportStats(logger *zap.SugaredLogger, sink RawSender, source <-chan []metrics.StatMessage) { +func ReportStats(logger *zap.SugaredLogger, sink RawSender, source <-chan []asmetrics.StatMessage, mp metric.MeterProvider) { + metrics := newStatReporterMetrics(mp) for sms := range source { - go func(sms []metrics.StatMessage) { - wsms := metrics.ToWireStatMessages(sms) + go func(sms []asmetrics.StatMessage) { + wsms := asmetrics.ToWireStatMessages(sms) b, err := wsms.Marshal() if err != nil { logger.Errorw("Error while marshaling stats", zap.Error(err)) @@ -42,7 +80,12 @@ func ReportStats(logger *zap.SugaredLogger, sink RawSender, source <-chan []metr } if err := sink.SendRaw(websocket.BinaryMessage, b); err != nil { - logger.Errorw("Error while sending stats", zap.Error(err)) + logger.Errorw("Autoscaler is not reachable from activator. Stats were not sent.", + zap.Error(err), + zap.Int("stat_message_count", len(sms))) + metrics.autoscalerReachable.Record(context.Background(), 0) + } else { + metrics.autoscalerReachable.Record(context.Background(), 1) } }(sms) } diff --git a/pkg/activator/stat_reporter_test.go b/pkg/activator/stat_reporter_test.go index 785f28d1fa4b..d480686d80a1 100644 --- a/pkg/activator/stat_reporter_test.go +++ b/pkg/activator/stat_reporter_test.go @@ -17,6 +17,8 @@ limitations under the License. package activator import ( + "context" + "errors" "testing" "time" @@ -43,7 +45,7 @@ func TestReportStats(t *testing.T) { }) defer close(ch) - go ReportStats(logger, sink, ch) + go ReportStats(logger, sink, ch, nil) inputs := [][]metrics.StatMessage{{{ Key: types.NamespacedName{Name: "first-a"}, @@ -95,3 +97,73 @@ type sendRawFunc func(msgType int, msg []byte) error func (fn sendRawFunc) SendRaw(msgType int, msg []byte) error { return fn(msgType, msg) } + +type statusCheckerFunc func() error + +func (fn statusCheckerFunc) Status() error { + return fn() +} + +func TestReportStatsSendFailure(t *testing.T) { + logger := logtesting.TestLogger(t) + ch := make(chan []metrics.StatMessage) + + sendErr := errors.New("connection refused") + errorReceived := make(chan struct{}) + sink := sendRawFunc(func(msgType int, msg []byte) error { + close(errorReceived) + return sendErr + }) + + defer close(ch) + go ReportStats(logger, sink, ch, nil) + + // Send a stat message + ch <- []metrics.StatMessage{{ + Key: types.NamespacedName{Name: "test-revision"}, + }} + + // Wait for the error to be processed + select { + case <-errorReceived: + // Success - the error path was executed + case <-time.After(2 * time.Second): + t.Fatal("SendRaw was not called within timeout") + } + + // Give some time for the goroutine to process the error and log + time.Sleep(100 * time.Millisecond) +} + +func TestAutoscalerConnectionStatusMonitor(t *testing.T) { + tests := []struct { + name string + statusErr error + }{{ + name: "connection established", + statusErr: nil, + }, { + name: "connection not established", + statusErr: errors.New("connection not established"), + }} + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + logger := logtesting.TestLogger(t) + ctx, cancel := context.WithCancel(context.Background()) + + checker := statusCheckerFunc(func() error { + return tt.statusErr + }) + + // Start the monitor + go AutoscalerConnectionStatusMonitor(ctx, logger, checker, nil) + + // Wait for at least one check to complete + time.Sleep(6 * time.Second) + + // Cancel context to stop the monitor + cancel() + }) + } +}