Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion cmd/activator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,8 @@ func main() {
logger.Info("Connecting to Autoscaler at ", autoscalerEndpoint)
statSink := websocket.NewDurableSendingConnection(autoscalerEndpoint, logger)
defer statSink.Shutdown()
go activator.ReportStats(logger, statSink, statCh)
go activator.ReportStats(logger, statSink, statCh, mp)
go activator.AutoscalerConnectionStatusMonitor(ctx, logger, statSink, mp)

// Create and run our concurrency reporter
concurrencyReporter := activatorhandler.NewConcurrencyReporter(ctx, env.PodName, statCh, mp)
Expand Down
53 changes: 53 additions & 0 deletions pkg/activator/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
Copyright 2024 The Knative Authors

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package activator

import (
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/metric"
)

var scopeName = "knative.dev/serving/pkg/activator"

type statReporterMetrics struct {
autoscalerReachable metric.Int64Gauge
}

func newStatReporterMetrics(mp metric.MeterProvider) *statReporterMetrics {
var (
m statReporterMetrics
err error
provider = mp
)

if provider == nil {
provider = otel.GetMeterProvider()
}

meter := provider.Meter(scopeName)

m.autoscalerReachable, err = meter.Int64Gauge(
"kn.activator.autoscaler.reachable",
metric.WithDescription("Whether the autoscaler is reachable from the activator (1 = reachable, 0 = not reachable)"),
metric.WithUnit("{reachable}"),
)
if err != nil {
panic(err)
}

return &m
}
53 changes: 48 additions & 5 deletions pkg/activator/stat_reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,18 @@ limitations under the License.
package activator

import (
"context"
"time"

"github.com/gorilla/websocket"
"go.opentelemetry.io/otel/metric"
"go.uber.org/zap"
"knative.dev/serving/pkg/autoscaler/metrics"
asmetrics "knative.dev/serving/pkg/autoscaler/metrics"
)

const (
// connectionCheckInterval is how often to check the autoscaler connection status.
connectionCheckInterval = 5 * time.Second
)

// RawSender sends raw byte array messages with a message type
Expand All @@ -28,21 +37,55 @@ type RawSender interface {
SendRaw(msgType int, msg []byte) error
}

// StatusChecker checks the connection status.
type StatusChecker interface {
Status() error
}

// AutoscalerConnectionStatusMonitor periodically checks if the autoscaler is reachable
// and emits metrics and logs accordingly.
func AutoscalerConnectionStatusMonitor(ctx context.Context, logger *zap.SugaredLogger, conn StatusChecker, mp metric.MeterProvider) {
metrics := newStatReporterMetrics(mp)
ticker := time.NewTicker(connectionCheckInterval)
defer ticker.Stop()

for {
select {
case <-ctx.Done():
return
case <-ticker.C:
if err := conn.Status(); err != nil {
logger.Errorw("Autoscaler is not reachable from activator.",
zap.Error(err))
metrics.autoscalerReachable.Record(context.Background(), 0)
} else {
metrics.autoscalerReachable.Record(context.Background(), 1)
}
}
}
}

// ReportStats sends any messages received on the source channel to the sink.
// The messages are sent on a goroutine to avoid blocking, which means that
// messages may arrive out of order.
func ReportStats(logger *zap.SugaredLogger, sink RawSender, source <-chan []metrics.StatMessage) {
func ReportStats(logger *zap.SugaredLogger, sink RawSender, source <-chan []asmetrics.StatMessage, mp metric.MeterProvider) {
metrics := newStatReporterMetrics(mp)
for sms := range source {
go func(sms []metrics.StatMessage) {
wsms := metrics.ToWireStatMessages(sms)
go func(sms []asmetrics.StatMessage) {
wsms := asmetrics.ToWireStatMessages(sms)
b, err := wsms.Marshal()
if err != nil {
logger.Errorw("Error while marshaling stats", zap.Error(err))
return
}

if err := sink.SendRaw(websocket.BinaryMessage, b); err != nil {
logger.Errorw("Error while sending stats", zap.Error(err))
logger.Errorw("Autoscaler is not reachable from activator. Stats were not sent.",
zap.Error(err),
zap.Int("stat_message_count", len(sms)))
metrics.autoscalerReachable.Record(context.Background(), 0)
} else {
metrics.autoscalerReachable.Record(context.Background(), 1)
}
}(sms)
}
Expand Down
74 changes: 73 additions & 1 deletion pkg/activator/stat_reporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ limitations under the License.
package activator

import (
"context"
"errors"
"testing"
"time"

Expand All @@ -43,7 +45,7 @@ func TestReportStats(t *testing.T) {
})

defer close(ch)
go ReportStats(logger, sink, ch)
go ReportStats(logger, sink, ch, nil)

inputs := [][]metrics.StatMessage{{{
Key: types.NamespacedName{Name: "first-a"},
Expand Down Expand Up @@ -95,3 +97,73 @@ type sendRawFunc func(msgType int, msg []byte) error
func (fn sendRawFunc) SendRaw(msgType int, msg []byte) error {
return fn(msgType, msg)
}

type statusCheckerFunc func() error

func (fn statusCheckerFunc) Status() error {
return fn()
}

func TestReportStatsSendFailure(t *testing.T) {
logger := logtesting.TestLogger(t)
ch := make(chan []metrics.StatMessage)

sendErr := errors.New("connection refused")
errorReceived := make(chan struct{})
sink := sendRawFunc(func(msgType int, msg []byte) error {
close(errorReceived)
return sendErr
})

defer close(ch)
go ReportStats(logger, sink, ch, nil)

// Send a stat message
ch <- []metrics.StatMessage{{
Key: types.NamespacedName{Name: "test-revision"},
}}

// Wait for the error to be processed
select {
case <-errorReceived:
// Success - the error path was executed
case <-time.After(2 * time.Second):
t.Fatal("SendRaw was not called within timeout")
}

// Give some time for the goroutine to process the error and log
time.Sleep(100 * time.Millisecond)
}

func TestAutoscalerConnectionStatusMonitor(t *testing.T) {
tests := []struct {
name string
statusErr error
}{{
name: "connection established",
statusErr: nil,
}, {
name: "connection not established",
statusErr: errors.New("connection not established"),
}}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
logger := logtesting.TestLogger(t)
ctx, cancel := context.WithCancel(context.Background())

checker := statusCheckerFunc(func() error {
return tt.statusErr
})

// Start the monitor
go AutoscalerConnectionStatusMonitor(ctx, logger, checker, nil)

// Wait for at least one check to complete
time.Sleep(6 * time.Second)

// Cancel context to stop the monitor
cancel()
})
}
}
Loading