Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 20 additions & 2 deletions cmd/operator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,12 @@ package main
import (
"crypto/tls"
"flag"
"net/http"
"os"
"time"

// Import all Kubernetes client auth plugins (e.g. Azure, GCP, OIDC, etc.)
// to ensure that exec-entrypoint and run can make use of them.
"net/http"

_ "k8s.io/client-go/plugin/pkg/client/auth"

"k8s.io/apimachinery/pkg/runtime"
Expand All @@ -40,6 +40,7 @@ import (
vinylv1alpha1 "github.com/bluedynamics/cloud-vinyl/api/v1alpha1"
"github.com/bluedynamics/cloud-vinyl/internal/controller"
"github.com/bluedynamics/cloud-vinyl/internal/generator"
"github.com/bluedynamics/cloud-vinyl/internal/proxy"
webhookv1alpha1 "github.com/bluedynamics/cloud-vinyl/internal/webhook/v1alpha1"
// +kubebuilder:scaffold:imports
)
Expand Down Expand Up @@ -182,6 +183,21 @@ func main() {
os.Exit(1)
}

// --- Invalidation proxy (runs on all replicas, not just leader) ---
proxyRouter := proxy.NewRegisteredRouter()
proxyPodMap := proxy.NewPodMap()
broadcaster := proxy.NewHTTPBroadcaster(10 * time.Second)
tokenProvider := controller.NewK8sTokenProvider(mgr.GetClient())
proxyServer := proxy.NewServer(":8090", proxyRouter, proxyPodMap, broadcaster, tokenProvider)

// Start proxy in background goroutine.
go func() {
setupLog.Info("Starting invalidation proxy", "addr", ":8090")
if err := proxyServer.Start(ctrl.SetupSignalHandler()); err != nil {
setupLog.Error(err, "Invalidation proxy failed")
}
}()

if err := (&controller.VinylCacheReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
Expand All @@ -190,6 +206,8 @@ func main() {
HTTPClient: &http.Client{},
K8sClient: mgr.GetClient(),
},
ProxyRouter: proxyRouter,
ProxyPodMap: proxyPodMap,
}).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "Failed to create controller", "controller", "VinylCache")
os.Exit(1)
Expand Down
8 changes: 8 additions & 0 deletions internal/controller/finalizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,14 @@ func (r *VinylCacheReconciler) handleDeletion(ctx context.Context, vc *v1alpha1.
return ctrl.Result{}, err
}

// Clean up proxy routing and pod map.
if r.ProxyRouter != nil {
r.ProxyRouter.Unregister(vc.Namespace, vc.Name)
}
if r.ProxyPodMap != nil {
r.ProxyPodMap.Delete(vc.Namespace, vc.Name)
}

// Remove finalizer — OwnerRef-controlled resources (StatefulSet, headless service,
// traffic service, secret) will be garbage-collected by Kubernetes automatically.
controllerutil.RemoveFinalizer(vc, finalizerName)
Expand Down
61 changes: 61 additions & 0 deletions internal/controller/token_provider.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
Copyright 2026.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package controller

import (
"context"
"time"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"
logf "sigs.k8s.io/controller-runtime/pkg/log"
)

// K8sTokenProvider reads agent tokens from Kubernetes Secrets.
// It implements proxy.TokenProvider.
type K8sTokenProvider struct {
client client.Reader
}

// NewK8sTokenProvider creates a new K8sTokenProvider.
func NewK8sTokenProvider(c client.Reader) *K8sTokenProvider {
return &K8sTokenProvider{client: c}
}

// GetToken reads the agent-token from the per-namespace Secret.
// Returns empty string on error (unauthenticated fallback).
func (p *K8sTokenProvider) GetToken(namespace string) string {
log := logf.Log.WithName("token-provider")

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

secret := &corev1.Secret{}
key := types.NamespacedName{Name: agentSecretName, Namespace: namespace}
if err := p.client.Get(ctx, key, secret); err != nil {
log.Error(err, "Failed to read agent secret", "namespace", namespace)
return ""
}

token, ok := secret.Data["agent-token"]
if !ok {
log.Info("Agent secret missing 'agent-token' key", "namespace", namespace)
return ""
}
return string(token)
}
16 changes: 16 additions & 0 deletions internal/controller/vinylcache_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (

v1alpha1 "github.com/bluedynamics/cloud-vinyl/api/v1alpha1"
"github.com/bluedynamics/cloud-vinyl/internal/generator"
"github.com/bluedynamics/cloud-vinyl/internal/proxy"
)

const (
Expand All @@ -52,6 +53,9 @@ type VinylCacheReconciler struct {
// OperatorIP is the operator pod's own IP address, used to populate the
// invalidation EndpointSlice. Set from the POD_IP environment variable.
OperatorIP string
// Proxy integration (optional — nil when proxy is disabled).
ProxyRouter *proxy.RegisteredRouter
ProxyPodMap *proxy.PodMap
}

// +kubebuilder:rbac:groups=vinyl.bluedynamics.eu,resources=vinylcaches,verbs=get;list;watch;create;update;patch;delete
Expand Down Expand Up @@ -130,6 +134,18 @@ func (r *VinylCacheReconciler) Reconcile(ctx context.Context, req ctrl.Request)
return ctrl.Result{}, err
}

// Update proxy routing and pod map.
if r.ProxyRouter != nil {
r.ProxyRouter.Register(vc.Namespace, vc.Name)
}
if r.ProxyPodMap != nil {
var podIPs []string
for _, p := range peers {
podIPs = append(podIPs, p.IP)
}
r.ProxyPodMap.Update(vc.Namespace, vc.Name, podIPs)
}

activeHash := ""
if vc.Status.ActiveVCL != nil {
activeHash = vc.Status.ActiveVCL.Hash
Expand Down
8 changes: 6 additions & 2 deletions internal/proxy/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func (s *Server) handlePurge(w http.ResponseWriter, r *http.Request, pods []stri

// handleBAN handles BAN requests (both BAN method and POST /ban).
// It validates the ban expression and broadcasts it to the agent API on each pod.
func (s *Server) handleBAN(w http.ResponseWriter, r *http.Request, pods []string) {
func (s *Server) handleBAN(w http.ResponseWriter, r *http.Request, pods []string, namespace string) {
var expression string

switch r.Method {
Expand Down Expand Up @@ -77,10 +77,14 @@ func (s *Server) handleBAN(w http.ResponseWriter, r *http.Request, pods []string

podAddrs := withPort(pods, agentPort)
bodyBytes, _ := json.Marshal(banRESTRequest{Expression: expression})
headers := map[string]string{"Content-Type": "application/json"}
if token := s.tokenProvider.GetToken(namespace); token != "" {
headers["Authorization"] = "Bearer " + token
}
req := BroadcastRequest{
Method: http.MethodPost,
Path: "/ban",
Headers: map[string]string{"Content-Type": "application/json"},
Headers: headers,
Body: bodyBytes,
}

Expand Down
4 changes: 2 additions & 2 deletions internal/proxy/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func newTestServer(mb *MockBroadcaster) *Server {
pm := NewPodMap()
pm.Update("production", "my-cache", []string{"10.0.0.1", "10.0.0.2", "10.0.0.3"})

return NewServer(":8090", router, pm, mb)
return NewServer(":8090", router, pm, mb, nil)
}

func okResult() BroadcastResult {
Expand Down Expand Up @@ -276,7 +276,7 @@ func TestNoPods(t *testing.T) {
})
// PodMap deliberately empty.
pm := NewPodMap()
srv := NewServer(":8090", router, pm, mb)
srv := NewServer(":8090", router, pm, mb, nil)

req := httptest.NewRequest("PURGE", "/product/123", nil)
req.Host = "my-cache-invalidation.production"
Expand Down
46 changes: 31 additions & 15 deletions internal/proxy/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,27 +6,43 @@ import (
"time"
)

// TokenProvider returns the agent Bearer token for a given namespace.
type TokenProvider interface {
GetToken(namespace string) string
}

// NoopTokenProvider returns empty tokens (no auth).
type NoopTokenProvider struct{}

// GetToken always returns an empty string (no auth).
func (n *NoopTokenProvider) GetToken(_ string) string { return "" }

// Server is the Purge/BAN broadcast proxy HTTP server, listening on port 8090.
type Server struct {
addr string
router Router
podMap PodIPProvider
broadcaster Broadcaster
acl map[string]*ACL // per-cache ACLs, keyed by "namespace/cacheName"
rateLimiter RateLimiter
addr string
router Router
podMap PodIPProvider
broadcaster Broadcaster
tokenProvider TokenProvider
acl map[string]*ACL // per-cache ACLs, keyed by "namespace/cacheName"
rateLimiter RateLimiter
}

// NewServer creates a new Server with the given dependencies.
// acl and rateLimiter may be nil; nil acl allows all sources, nil rateLimiter
// disables rate limiting.
func NewServer(addr string, router Router, pods PodIPProvider, b Broadcaster) *Server {
func NewServer(addr string, router Router, pods PodIPProvider, b Broadcaster, tp TokenProvider) *Server {
if tp == nil {
tp = &NoopTokenProvider{}
}
return &Server{
addr: addr,
router: router,
podMap: pods,
broadcaster: b,
acl: make(map[string]*ACL),
rateLimiter: &NoopRateLimiter{},
addr: addr,
router: router,
podMap: pods,
broadcaster: b,
tokenProvider: tp,
acl: make(map[string]*ACL),
rateLimiter: &NoopRateLimiter{},
}
}

Expand Down Expand Up @@ -80,9 +96,9 @@ func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
case r.Method == "PURGE":
s.handlePurge(w, r, pods)
case r.Method == "BAN":
s.handleBAN(w, r, pods)
s.handleBAN(w, r, pods, namespace)
case r.Method == http.MethodPost && r.URL.Path == "/ban":
s.handleBAN(w, r, pods)
s.handleBAN(w, r, pods, namespace)
case r.Method == http.MethodPost && r.URL.Path == "/purge/xkey":
s.handleXkey(w, r, pods)
default:
Expand Down
4 changes: 2 additions & 2 deletions internal/proxy/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ func TestServerStartStop(t *testing.T) {
pm := NewPodMap()
pm.Update("production", "my-cache", []string{"10.0.0.1"})

srv := NewServer("127.0.0.1:0", router, pm, mb)
srv := NewServer("127.0.0.1:0", router, pm, mb, nil)

// Use a fixed port in test range.
srv.addr = "127.0.0.1:19876"
Expand Down Expand Up @@ -57,7 +57,7 @@ func TestNewServer_Defaults(t *testing.T) {
pm := NewPodMap()
mb := &MockBroadcaster{}

srv := NewServer(":8090", router, pm, mb)
srv := NewServer(":8090", router, pm, mb, nil)

assert.NotNil(t, srv)
assert.Equal(t, ":8090", srv.addr)
Expand Down
Loading