From ab581efeabb1b84cdd42b9359af170f3c1dbe504 Mon Sep 17 00:00:00 2001 From: "Jens W. Klein" Date: Wed, 8 Apr 2026 15:22:03 +0200 Subject: [PATCH 1/3] feat: add TokenProvider to proxy for agent BAN auth BAN requests are forwarded to the agent on port 9090 which requires Bearer auth. The proxy reads the token from a TokenProvider (injected by the controller) and includes it in the Authorization header. Co-Authored-By: Claude Opus 4.6 (1M context) --- internal/proxy/handler.go | 8 ++++-- internal/proxy/handler_test.go | 4 +-- internal/proxy/server.go | 46 +++++++++++++++++++++++----------- internal/proxy/server_test.go | 4 +-- 4 files changed, 41 insertions(+), 21 deletions(-) diff --git a/internal/proxy/handler.go b/internal/proxy/handler.go index 1b9a028..c366339 100644 --- a/internal/proxy/handler.go +++ b/internal/proxy/handler.go @@ -43,7 +43,7 @@ func (s *Server) handlePurge(w http.ResponseWriter, r *http.Request, pods []stri // handleBAN handles BAN requests (both BAN method and POST /ban). // It validates the ban expression and broadcasts it to the agent API on each pod. -func (s *Server) handleBAN(w http.ResponseWriter, r *http.Request, pods []string) { +func (s *Server) handleBAN(w http.ResponseWriter, r *http.Request, pods []string, namespace string) { var expression string switch r.Method { @@ -77,10 +77,14 @@ func (s *Server) handleBAN(w http.ResponseWriter, r *http.Request, pods []string podAddrs := withPort(pods, agentPort) bodyBytes, _ := json.Marshal(banRESTRequest{Expression: expression}) + headers := map[string]string{"Content-Type": "application/json"} + if token := s.tokenProvider.GetToken(namespace); token != "" { + headers["Authorization"] = "Bearer " + token + } req := BroadcastRequest{ Method: http.MethodPost, Path: "/ban", - Headers: map[string]string{"Content-Type": "application/json"}, + Headers: headers, Body: bodyBytes, } diff --git a/internal/proxy/handler_test.go b/internal/proxy/handler_test.go index 6367057..786e252 100644 --- a/internal/proxy/handler_test.go +++ b/internal/proxy/handler_test.go @@ -36,7 +36,7 @@ func newTestServer(mb *MockBroadcaster) *Server { pm := NewPodMap() pm.Update("production", "my-cache", []string{"10.0.0.1", "10.0.0.2", "10.0.0.3"}) - return NewServer(":8090", router, pm, mb) + return NewServer(":8090", router, pm, mb, nil) } func okResult() BroadcastResult { @@ -276,7 +276,7 @@ func TestNoPods(t *testing.T) { }) // PodMap deliberately empty. pm := NewPodMap() - srv := NewServer(":8090", router, pm, mb) + srv := NewServer(":8090", router, pm, mb, nil) req := httptest.NewRequest("PURGE", "/product/123", nil) req.Host = "my-cache-invalidation.production" diff --git a/internal/proxy/server.go b/internal/proxy/server.go index 758ea7e..fc7da74 100644 --- a/internal/proxy/server.go +++ b/internal/proxy/server.go @@ -6,27 +6,43 @@ import ( "time" ) +// TokenProvider returns the agent Bearer token for a given namespace. +type TokenProvider interface { + GetToken(namespace string) string +} + +// NoopTokenProvider returns empty tokens (no auth). +type NoopTokenProvider struct{} + +// GetToken always returns an empty string (no auth). +func (n *NoopTokenProvider) GetToken(_ string) string { return "" } + // Server is the Purge/BAN broadcast proxy HTTP server, listening on port 8090. type Server struct { - addr string - router Router - podMap PodIPProvider - broadcaster Broadcaster - acl map[string]*ACL // per-cache ACLs, keyed by "namespace/cacheName" - rateLimiter RateLimiter + addr string + router Router + podMap PodIPProvider + broadcaster Broadcaster + tokenProvider TokenProvider + acl map[string]*ACL // per-cache ACLs, keyed by "namespace/cacheName" + rateLimiter RateLimiter } // NewServer creates a new Server with the given dependencies. // acl and rateLimiter may be nil; nil acl allows all sources, nil rateLimiter // disables rate limiting. -func NewServer(addr string, router Router, pods PodIPProvider, b Broadcaster) *Server { +func NewServer(addr string, router Router, pods PodIPProvider, b Broadcaster, tp TokenProvider) *Server { + if tp == nil { + tp = &NoopTokenProvider{} + } return &Server{ - addr: addr, - router: router, - podMap: pods, - broadcaster: b, - acl: make(map[string]*ACL), - rateLimiter: &NoopRateLimiter{}, + addr: addr, + router: router, + podMap: pods, + broadcaster: b, + tokenProvider: tp, + acl: make(map[string]*ACL), + rateLimiter: &NoopRateLimiter{}, } } @@ -80,9 +96,9 @@ func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) { case r.Method == "PURGE": s.handlePurge(w, r, pods) case r.Method == "BAN": - s.handleBAN(w, r, pods) + s.handleBAN(w, r, pods, namespace) case r.Method == http.MethodPost && r.URL.Path == "/ban": - s.handleBAN(w, r, pods) + s.handleBAN(w, r, pods, namespace) case r.Method == http.MethodPost && r.URL.Path == "/purge/xkey": s.handleXkey(w, r, pods) default: diff --git a/internal/proxy/server_test.go b/internal/proxy/server_test.go index 11a54a3..666df52 100644 --- a/internal/proxy/server_test.go +++ b/internal/proxy/server_test.go @@ -18,7 +18,7 @@ func TestServerStartStop(t *testing.T) { pm := NewPodMap() pm.Update("production", "my-cache", []string{"10.0.0.1"}) - srv := NewServer("127.0.0.1:0", router, pm, mb) + srv := NewServer("127.0.0.1:0", router, pm, mb, nil) // Use a fixed port in test range. srv.addr = "127.0.0.1:19876" @@ -57,7 +57,7 @@ func TestNewServer_Defaults(t *testing.T) { pm := NewPodMap() mb := &MockBroadcaster{} - srv := NewServer(":8090", router, pm, mb) + srv := NewServer(":8090", router, pm, mb, nil) assert.NotNil(t, srv) assert.Equal(t, ":8090", srv.addr) From 40e6ae0f101acec99382dfb5d8de5a4b60254b18 Mon Sep 17 00:00:00 2001 From: "Jens W. Klein" Date: Wed, 8 Apr 2026 15:24:29 +0200 Subject: [PATCH 2/3] feat: wire proxy PodMap/Router into reconciler and add K8sTokenProvider Reconciler updates PodMap with ready peer IPs and registers/unregisters routes on VinylCache create/delete. K8sTokenProvider reads agent token from per-namespace Secret for BAN request auth. Fields are optional (nil-safe) so proxy can be disabled. Co-Authored-By: Claude Opus 4.6 (1M context) --- internal/controller/finalizer.go | 8 +++ internal/controller/token_provider.go | 61 ++++++++++++++++++++ internal/controller/vinylcache_controller.go | 16 +++++ 3 files changed, 85 insertions(+) create mode 100644 internal/controller/token_provider.go diff --git a/internal/controller/finalizer.go b/internal/controller/finalizer.go index bc047b6..abbfde2 100644 --- a/internal/controller/finalizer.go +++ b/internal/controller/finalizer.go @@ -55,6 +55,14 @@ func (r *VinylCacheReconciler) handleDeletion(ctx context.Context, vc *v1alpha1. return ctrl.Result{}, err } + // Clean up proxy routing and pod map. + if r.ProxyRouter != nil { + r.ProxyRouter.Unregister(vc.Namespace, vc.Name) + } + if r.ProxyPodMap != nil { + r.ProxyPodMap.Delete(vc.Namespace, vc.Name) + } + // Remove finalizer — OwnerRef-controlled resources (StatefulSet, headless service, // traffic service, secret) will be garbage-collected by Kubernetes automatically. controllerutil.RemoveFinalizer(vc, finalizerName) diff --git a/internal/controller/token_provider.go b/internal/controller/token_provider.go new file mode 100644 index 0000000..7584e60 --- /dev/null +++ b/internal/controller/token_provider.go @@ -0,0 +1,61 @@ +/* +Copyright 2026. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package controller + +import ( + "context" + "time" + + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/types" + "sigs.k8s.io/controller-runtime/pkg/client" + logf "sigs.k8s.io/controller-runtime/pkg/log" +) + +// K8sTokenProvider reads agent tokens from Kubernetes Secrets. +// It implements proxy.TokenProvider. +type K8sTokenProvider struct { + client client.Reader +} + +// NewK8sTokenProvider creates a new K8sTokenProvider. +func NewK8sTokenProvider(c client.Reader) *K8sTokenProvider { + return &K8sTokenProvider{client: c} +} + +// GetToken reads the agent-token from the per-namespace Secret. +// Returns empty string on error (unauthenticated fallback). +func (p *K8sTokenProvider) GetToken(namespace string) string { + log := logf.Log.WithName("token-provider") + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + secret := &corev1.Secret{} + key := types.NamespacedName{Name: agentSecretName, Namespace: namespace} + if err := p.client.Get(ctx, key, secret); err != nil { + log.Error(err, "Failed to read agent secret", "namespace", namespace) + return "" + } + + token, ok := secret.Data["agent-token"] + if !ok { + log.Info("Agent secret missing 'agent-token' key", "namespace", namespace) + return "" + } + return string(token) +} diff --git a/internal/controller/vinylcache_controller.go b/internal/controller/vinylcache_controller.go index 391f898..007329c 100644 --- a/internal/controller/vinylcache_controller.go +++ b/internal/controller/vinylcache_controller.go @@ -35,6 +35,7 @@ import ( v1alpha1 "github.com/bluedynamics/cloud-vinyl/api/v1alpha1" "github.com/bluedynamics/cloud-vinyl/internal/generator" + "github.com/bluedynamics/cloud-vinyl/internal/proxy" ) const ( @@ -52,6 +53,9 @@ type VinylCacheReconciler struct { // OperatorIP is the operator pod's own IP address, used to populate the // invalidation EndpointSlice. Set from the POD_IP environment variable. OperatorIP string + // Proxy integration (optional — nil when proxy is disabled). + ProxyRouter *proxy.RegisteredRouter + ProxyPodMap *proxy.PodMap } // +kubebuilder:rbac:groups=vinyl.bluedynamics.eu,resources=vinylcaches,verbs=get;list;watch;create;update;patch;delete @@ -130,6 +134,18 @@ func (r *VinylCacheReconciler) Reconcile(ctx context.Context, req ctrl.Request) return ctrl.Result{}, err } + // Update proxy routing and pod map. + if r.ProxyRouter != nil { + r.ProxyRouter.Register(vc.Namespace, vc.Name) + } + if r.ProxyPodMap != nil { + var podIPs []string + for _, p := range peers { + podIPs = append(podIPs, p.IP) + } + r.ProxyPodMap.Update(vc.Namespace, vc.Name, podIPs) + } + activeHash := "" if vc.Status.ActiveVCL != nil { activeHash = vc.Status.ActiveVCL.Hash From f3e69840348b9b7fce54fd9f75553816ff81f40e Mon Sep 17 00:00:00 2001 From: "Jens W. Klein" Date: Wed, 8 Apr 2026 15:26:07 +0200 Subject: [PATCH 3/3] feat: start invalidation proxy on operator port 8090 Proxy runs on all replicas (not just leader) because the invalidation Service load-balances across all operator pods. Router and PodMap are shared with the reconciler for dynamic pod/route updates. Co-Authored-By: Claude Opus 4.6 (1M context) --- cmd/operator/main.go | 22 ++++++++++++++++++++-- 1 file changed, 20 insertions(+), 2 deletions(-) diff --git a/cmd/operator/main.go b/cmd/operator/main.go index 6bd5d31..86eab45 100644 --- a/cmd/operator/main.go +++ b/cmd/operator/main.go @@ -19,12 +19,12 @@ package main import ( "crypto/tls" "flag" + "net/http" "os" + "time" // Import all Kubernetes client auth plugins (e.g. Azure, GCP, OIDC, etc.) // to ensure that exec-entrypoint and run can make use of them. - "net/http" - _ "k8s.io/client-go/plugin/pkg/client/auth" "k8s.io/apimachinery/pkg/runtime" @@ -40,6 +40,7 @@ import ( vinylv1alpha1 "github.com/bluedynamics/cloud-vinyl/api/v1alpha1" "github.com/bluedynamics/cloud-vinyl/internal/controller" "github.com/bluedynamics/cloud-vinyl/internal/generator" + "github.com/bluedynamics/cloud-vinyl/internal/proxy" webhookv1alpha1 "github.com/bluedynamics/cloud-vinyl/internal/webhook/v1alpha1" // +kubebuilder:scaffold:imports ) @@ -182,6 +183,21 @@ func main() { os.Exit(1) } + // --- Invalidation proxy (runs on all replicas, not just leader) --- + proxyRouter := proxy.NewRegisteredRouter() + proxyPodMap := proxy.NewPodMap() + broadcaster := proxy.NewHTTPBroadcaster(10 * time.Second) + tokenProvider := controller.NewK8sTokenProvider(mgr.GetClient()) + proxyServer := proxy.NewServer(":8090", proxyRouter, proxyPodMap, broadcaster, tokenProvider) + + // Start proxy in background goroutine. + go func() { + setupLog.Info("Starting invalidation proxy", "addr", ":8090") + if err := proxyServer.Start(ctrl.SetupSignalHandler()); err != nil { + setupLog.Error(err, "Invalidation proxy failed") + } + }() + if err := (&controller.VinylCacheReconciler{ Client: mgr.GetClient(), Scheme: mgr.GetScheme(), @@ -190,6 +206,8 @@ func main() { HTTPClient: &http.Client{}, K8sClient: mgr.GetClient(), }, + ProxyRouter: proxyRouter, + ProxyPodMap: proxyPodMap, }).SetupWithManager(mgr); err != nil { setupLog.Error(err, "Failed to create controller", "controller", "VinylCache") os.Exit(1)