From 5afe1707a414d0470863f8618d469998f64b67e1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Christoph=20St=C3=A4bler?= Date: Tue, 21 Apr 2026 16:18:50 +0200 Subject: [PATCH] Migrate from deprecated v1 Endpoints to discovery.k8s.io/v1 EndpointSlice The Kubernetes v1 Endpoints API is deprecated in v1.33+. This migrates the Broker, InMemoryChannel, and EventTransform reconcilers to use EndpointSlice with label-based listing instead of name-based Endpoints lookups, and updates RBAC roles, test helpers, and all test files. --- .../roles/controller-clusterrole.yaml | 4 +- .../core/roles/controller-clusterroles.yaml | 10 +- pkg/apis/duck/lifecycle_helper.go | 15 +- pkg/apis/eventing/v1/broker_lifecycle_mt.go | 14 +- pkg/apis/eventing/v1/broker_lifecycle_test.go | 17 +- pkg/apis/eventing/v1/test_helper.go | 35 +- pkg/reconciler/broker/broker.go | 26 +- pkg/reconciler/broker/broker_test.go | 264 ++++---- pkg/reconciler/broker/controller.go | 40 +- pkg/reconciler/broker/trigger/trigger_test.go | 41 +- pkg/reconciler/eventtransform/controller.go | 13 +- .../eventtransform/eventtransform.go | 36 +- .../eventtransform/eventtransform_test.go | 569 +++++++++--------- .../inmemorychannel/controller/controller.go | 22 +- .../controller/controller_test.go | 2 +- .../controller/inmemorychannel.go | 43 +- .../controller/inmemorychannel_test.go | 99 +-- pkg/reconciler/testing/endpointslice.go | 58 ++ pkg/reconciler/testing/listers.go | 6 + pkg/reconciler/testing/v1/broker.go | 4 +- pkg/reconciler/testing/v1/endpointslice.go | 58 ++ pkg/reconciler/testing/v1/listers.go | 6 + pkg/reconciler/testing/v1beta1/listers.go | 6 + pkg/reconciler/testing/v1beta2/listers.go | 6 + pkg/reconciler/testing/v1beta3/listers.go | 6 + .../v1/endpointslice/endpointslice.go | 52 ++ .../discovery/v1/endpointslice/fake/fake.go | 40 ++ vendor/modules.txt | 2 + 28 files changed, 941 insertions(+), 553 deletions(-) create mode 100644 pkg/reconciler/testing/endpointslice.go create mode 100644 pkg/reconciler/testing/v1/endpointslice.go create mode 100644 vendor/knative.dev/pkg/client/injection/kube/informers/discovery/v1/endpointslice/endpointslice.go create mode 100644 vendor/knative.dev/pkg/client/injection/kube/informers/discovery/v1/endpointslice/fake/fake.go diff --git a/config/channels/in-memory-channel/roles/controller-clusterrole.yaml b/config/channels/in-memory-channel/roles/controller-clusterrole.yaml index 6164e834f41..c46fb295073 100644 --- a/config/channels/in-memory-channel/roles/controller-clusterrole.yaml +++ b/config/channels/in-memory-channel/roles/controller-clusterrole.yaml @@ -66,9 +66,9 @@ rules: - update - patch - apiGroups: - - "" + - "discovery.k8s.io" resources: - - endpoints + - endpointslices verbs: - get - list diff --git a/config/core/roles/controller-clusterroles.yaml b/config/core/roles/controller-clusterroles.yaml index 3c8017213af..5be6b98e9b5 100644 --- a/config/core/roles/controller-clusterroles.yaml +++ b/config/core/roles/controller-clusterroles.yaml @@ -27,7 +27,6 @@ rules: - "secrets" - "configmaps" - "services" - - "endpoints" - "events" - "serviceaccounts" - "pods" @@ -41,6 +40,15 @@ rules: - "patch" - "watch" + - apiGroups: + - "discovery.k8s.io" + resources: + - "endpointslices" + verbs: + - "get" + - "list" + - "watch" + # Brokers and the namespace annotation controllers manipulate Deployments. # RequestReply controller needs to manipulate StatefulSets - apiGroups: diff --git a/pkg/apis/duck/lifecycle_helper.go b/pkg/apis/duck/lifecycle_helper.go index 1b4badcb742..5dfe5616072 100644 --- a/pkg/apis/duck/lifecycle_helper.go +++ b/pkg/apis/duck/lifecycle_helper.go @@ -18,7 +18,7 @@ package duck import ( appsv1 "k8s.io/api/apps/v1" - corev1 "k8s.io/api/core/v1" + discoveryv1 "k8s.io/api/discovery/v1" ) // DeploymentIsAvailable determines if the provided deployment is available. Note that if it cannot @@ -33,11 +33,14 @@ func DeploymentIsAvailable(d *appsv1.DeploymentStatus, def bool) bool { return def } -// EndpointsAreAvailable determines if the provided Endpoints are available. -func EndpointsAreAvailable(ep *corev1.Endpoints) bool { - for _, subset := range ep.Subsets { - if len(subset.Addresses) > 0 { - return true +// EndpointSlicesAreAvailable determines if the provided EndpointSlices have any ready endpoints. +func EndpointSlicesAreAvailable(epSlices []*discoveryv1.EndpointSlice) bool { + for _, eps := range epSlices { + for _, ep := range eps.Endpoints { + // Per the K8s API spec, a nil Ready value should be interpreted as ready. + if ep.Conditions.Ready == nil || *ep.Conditions.Ready { + return true + } } } return false diff --git a/pkg/apis/eventing/v1/broker_lifecycle_mt.go b/pkg/apis/eventing/v1/broker_lifecycle_mt.go index a27ceaa2419..46932349c53 100644 --- a/pkg/apis/eventing/v1/broker_lifecycle_mt.go +++ b/pkg/apis/eventing/v1/broker_lifecycle_mt.go @@ -17,7 +17,7 @@ limitations under the License. package v1 import ( - corev1 "k8s.io/api/core/v1" + discoveryv1 "k8s.io/api/discovery/v1" "knative.dev/eventing/pkg/apis/duck" duckv1 "knative.dev/eventing/pkg/apis/duck/v1" @@ -27,11 +27,11 @@ func (bs *BrokerStatus) MarkIngressFailed(reason, format string, args ...interfa bs.GetConditionSet().Manage(bs).MarkFalse(BrokerConditionIngress, reason, format, args...) } -func (bs *BrokerStatus) PropagateIngressAvailability(ep *corev1.Endpoints) { - if duck.EndpointsAreAvailable(ep) { +func (bs *BrokerStatus) PropagateIngressAvailability(epSlices []*discoveryv1.EndpointSlice) { + if duck.EndpointSlicesAreAvailable(epSlices) { bs.GetConditionSet().Manage(bs).MarkTrue(BrokerConditionIngress) } else { - bs.MarkIngressFailed("EndpointsUnavailable", "Endpoints %q are unavailable.", ep.Name) + bs.MarkIngressFailed("EndpointSlicesUnavailable", "EndpointSlices are unavailable.") } } @@ -57,10 +57,10 @@ func (bs *BrokerStatus) MarkFilterFailed(reason, format string, args ...interfac bs.GetConditionSet().Manage(bs).MarkFalse(BrokerConditionFilter, reason, format, args...) } -func (bs *BrokerStatus) PropagateFilterAvailability(ep *corev1.Endpoints) { - if duck.EndpointsAreAvailable(ep) { +func (bs *BrokerStatus) PropagateFilterAvailability(epSlices []*discoveryv1.EndpointSlice) { + if duck.EndpointSlicesAreAvailable(epSlices) { bs.GetConditionSet().Manage(bs).MarkTrue(BrokerConditionFilter) } else { - bs.MarkFilterFailed("EndpointsUnavailable", "Endpoints %q are unavailable.", ep.Name) + bs.MarkFilterFailed("EndpointSlicesUnavailable", "EndpointSlices are unavailable.") } } diff --git a/pkg/apis/eventing/v1/broker_lifecycle_test.go b/pkg/apis/eventing/v1/broker_lifecycle_test.go index 90ca1735fad..e0c0250b8ba 100644 --- a/pkg/apis/eventing/v1/broker_lifecycle_test.go +++ b/pkg/apis/eventing/v1/broker_lifecycle_test.go @@ -18,6 +18,7 @@ import ( "github.com/google/go-cmp/cmp" corev1 "k8s.io/api/core/v1" + discoveryv1 "k8s.io/api/discovery/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/utils/pointer" @@ -499,13 +500,13 @@ func TestBrokerIsReady(t *testing.T) { t.Run(test.name, func(t *testing.T) { bs := BrokerStatus{} if test.markIngressReady != nil { - var ep *corev1.Endpoints + var epSlices []*discoveryv1.EndpointSlice if *test.markIngressReady { - ep = TestHelper.AvailableEndpoints() + epSlices = TestHelper.AvailableEndpointSlices() } else { - ep = TestHelper.UnavailableEndpoints() + epSlices = TestHelper.UnavailableEndpointSlices() } - bs.PropagateIngressAvailability(ep) + bs.PropagateIngressAvailability(epSlices) } if test.markTriggerChannelReady != nil { var c *eventingduckv1.ChannelableStatus @@ -534,13 +535,13 @@ func TestBrokerIsReady(t *testing.T) { } if test.markFilterReady != nil { - var ep *corev1.Endpoints + var epSlices []*discoveryv1.EndpointSlice if *test.markFilterReady { - ep = TestHelper.AvailableEndpoints() + epSlices = TestHelper.AvailableEndpointSlices() } else { - ep = TestHelper.UnavailableEndpoints() + epSlices = TestHelper.UnavailableEndpointSlices() } - bs.PropagateFilterAvailability(ep) + bs.PropagateFilterAvailability(epSlices) } if test.markAddressable == nil && test.address == nil { diff --git a/pkg/apis/eventing/v1/test_helper.go b/pkg/apis/eventing/v1/test_helper.go index b52ce75be79..f74b08ca261 100644 --- a/pkg/apis/eventing/v1/test_helper.go +++ b/pkg/apis/eventing/v1/test_helper.go @@ -18,6 +18,7 @@ package v1 import ( corev1 "k8s.io/api/core/v1" + discoveryv1 "k8s.io/api/discovery/v1" "knative.dev/pkg/apis" duckv1 "knative.dev/pkg/apis/duck/v1" @@ -59,9 +60,9 @@ func (testHelper) ReadySubscriptionStatus() *messagingv1.SubscriptionStatus { func (t testHelper) ReadyBrokerStatus() *BrokerStatus { bs := &BrokerStatus{} - bs.PropagateIngressAvailability(t.AvailableEndpoints()) + bs.PropagateIngressAvailability(t.AvailableEndpointSlices()) bs.PropagateTriggerChannelReadiness(t.ReadyChannelStatus()) - bs.PropagateFilterAvailability(t.AvailableEndpoints()) + bs.PropagateFilterAvailability(t.AvailableEndpointSlices()) bs.SetAddress(&duckv1.Addressable{ URL: apis.HTTP("example.com"), }) @@ -72,9 +73,9 @@ func (t testHelper) ReadyBrokerStatus() *BrokerStatus { func (t testHelper) ReadyBrokerStatusWithoutDLS() *BrokerStatus { bs := &BrokerStatus{} - bs.PropagateIngressAvailability(t.AvailableEndpoints()) + bs.PropagateIngressAvailability(t.AvailableEndpointSlices()) bs.PropagateTriggerChannelReadiness(t.ReadyChannelStatus()) - bs.PropagateFilterAvailability(t.AvailableEndpoints()) + bs.PropagateFilterAvailability(t.AvailableEndpointSlices()) bs.SetAddress(&duckv1.Addressable{ URL: apis.HTTP("example.com"), }) @@ -102,26 +103,24 @@ func (testHelper) FalseBrokerStatus() *BrokerStatus { return bs } -func (testHelper) UnavailableEndpoints() *corev1.Endpoints { - ep := &corev1.Endpoints{} - ep.Name = "unavailable" - ep.Subsets = []corev1.EndpointSubset{{ - NotReadyAddresses: []corev1.EndpointAddress{{ - IP: "127.0.0.1", +func (testHelper) UnavailableEndpointSlices() []*discoveryv1.EndpointSlice { + ready := false + return []*discoveryv1.EndpointSlice{{ + Endpoints: []discoveryv1.Endpoint{{ + Addresses: []string{"127.0.0.1"}, + Conditions: discoveryv1.EndpointConditions{Ready: &ready}, }}, }} - return ep } -func (testHelper) AvailableEndpoints() *corev1.Endpoints { - ep := &corev1.Endpoints{} - ep.Name = "available" - ep.Subsets = []corev1.EndpointSubset{{ - Addresses: []corev1.EndpointAddress{{ - IP: "127.0.0.1", +func (testHelper) AvailableEndpointSlices() []*discoveryv1.EndpointSlice { + ready := true + return []*discoveryv1.EndpointSlice{{ + Endpoints: []discoveryv1.Endpoint{{ + Addresses: []string{"127.0.0.1"}, + Conditions: discoveryv1.EndpointConditions{Ready: &ready}, }}, }} - return ep } func (testHelper) ReadyChannelStatus() *eventingduckv1.ChannelableStatus { diff --git a/pkg/reconciler/broker/broker.go b/pkg/reconciler/broker/broker.go index 3550f73ab93..da51773dfbd 100644 --- a/pkg/reconciler/broker/broker.go +++ b/pkg/reconciler/broker/broker.go @@ -24,6 +24,7 @@ import ( "go.uber.org/zap" corev1 "k8s.io/api/core/v1" + discoveryv1 "k8s.io/api/discovery/v1" "k8s.io/apimachinery/pkg/api/equality" apierrs "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/meta" @@ -33,6 +34,7 @@ import ( "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/dynamic" corev1listers "k8s.io/client-go/listers/core/v1" + discoveryv1listers "k8s.io/client-go/listers/discovery/v1" "k8s.io/utils/pointer" "knative.dev/pkg/kmeta" "knative.dev/pkg/resolver" @@ -72,10 +74,10 @@ type Reconciler struct { dynamicClientSet dynamic.Interface // listers index properties about resources - endpointsLister corev1listers.EndpointsLister - subscriptionLister messaginglisters.SubscriptionLister - configmapLister corev1listers.ConfigMapLister - secretLister corev1listers.SecretLister + endpointSliceLister discoveryv1listers.EndpointSliceLister + subscriptionLister messaginglisters.SubscriptionLister + configmapLister corev1listers.ConfigMapLister + secretLister corev1listers.SecretLister channelableTracker ducklib.ListableTracker @@ -188,21 +190,25 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, b *eventingv1.Broker) pk b.Status.PropagateTriggerChannelReadiness(channelStatus) - filterEndpoints, err := r.endpointsLister.Endpoints(system.Namespace()).Get(names.BrokerFilterName) + filterEpSlices, err := r.endpointSliceLister.EndpointSlices(system.Namespace()).List(labels.SelectorFromSet(labels.Set{ + discoveryv1.LabelServiceName: names.BrokerFilterName, + })) if err != nil { - logging.FromContext(ctx).Errorw("Problem getting endpoints for filter", zap.String("namespace", system.Namespace()), zap.Error(err)) + logging.FromContext(ctx).Errorw("Problem getting EndpointSlices for filter", zap.String("namespace", system.Namespace()), zap.Error(err)) b.Status.MarkFilterFailed("ServiceFailure", "%v", err) return err } - b.Status.PropagateFilterAvailability(filterEndpoints) + b.Status.PropagateFilterAvailability(filterEpSlices) - ingressEndpoints, err := r.endpointsLister.Endpoints(system.Namespace()).Get(names.BrokerIngressName) + ingressEpSlices, err := r.endpointSliceLister.EndpointSlices(system.Namespace()).List(labels.SelectorFromSet(labels.Set{ + discoveryv1.LabelServiceName: names.BrokerIngressName, + })) if err != nil { - logging.FromContext(ctx).Errorw("Problem getting endpoints for ingress", zap.String("namespace", system.Namespace()), zap.Error(err)) + logging.FromContext(ctx).Errorw("Problem getting EndpointSlices for ingress", zap.String("namespace", system.Namespace()), zap.Error(err)) b.Status.MarkIngressFailed("ServiceFailure", "%v", err) return err } - b.Status.PropagateIngressAvailability(ingressEndpoints) + b.Status.PropagateIngressAvailability(ingressEpSlices) if b.Spec.Delivery != nil && b.Spec.Delivery.DeadLetterSink != nil { deadLetterSinkAddr, err := r.uriResolver.AddressableFromDestinationV1(ctx, *b.Spec.Delivery.DeadLetterSink, b) diff --git a/pkg/reconciler/broker/broker_test.go b/pkg/reconciler/broker/broker_test.go index afa3e90d100..2c4b4f956c6 100644 --- a/pkg/reconciler/broker/broker_test.go +++ b/pkg/reconciler/broker/broker_test.go @@ -24,6 +24,7 @@ import ( "testing" corev1 "k8s.io/api/core/v1" + discoveryv1 "k8s.io/api/discovery/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime" @@ -350,7 +351,7 @@ func TestReconcile(t *testing.T) { WithTriggerChannelFailed("NoAddress", "Channel does not have an address.")), }}, }, { - Name: "Trigger Channel endpoints fails", + Name: "Trigger Channel endpoints unavailable", Key: testKey, Objects: []runtime.Object{ NewBroker(brokerName, testNS, @@ -370,12 +371,12 @@ func TestReconcile(t *testing.T) { WithChannelAPIVersionAnnotation(triggerChannelAPIVersion), WithChannelKindAnnotation(triggerChannelKind), WithChannelNameAnnotation(triggerChannelName), - WithFilterFailed("ServiceFailure", `endpoints "broker-filter" not found`)), + WithFilterFailed("EndpointSlicesUnavailable", "EndpointSlices are unavailable."), + WithIngressFailed("EndpointSlicesUnavailable", "EndpointSlices are unavailable."), + WithBrokerAddressURI(brokerAddress), + WithDLSNotConfigured(), + WithBrokerEventPoliciesReadyBecauseOIDCDisabled()), }}, - WantEvents: []string{ - Eventf(corev1.EventTypeWarning, "InternalError", `endpoints "broker-filter" not found`), - }, - WantErr: true, }, { Name: "Successful Reconciliation", Key: testKey, @@ -386,12 +387,12 @@ func TestReconcile(t *testing.T) { WithInitBrokerConditions), createChannel(withChannelReady), imcConfigMap(), - NewEndpoints(filterServiceName, systemNS, - WithEndpointsLabels(FilterLabels()), - WithEndpointsAddresses(corev1.EndpointAddress{IP: "127.0.0.1"})), - NewEndpoints(ingressServiceName, systemNS, - WithEndpointsLabels(IngressLabels()), - WithEndpointsAddresses(corev1.EndpointAddress{IP: "127.0.0.1"})), + NewEndpointSlice(filterServiceName, systemNS, + WithEndpointSliceLabels(withServiceNameLabel(FilterLabels(), filterServiceName)), + WithEndpointSliceAddresses(readyEndpoint("127.0.0.1"))), + NewEndpointSlice(ingressServiceName, systemNS, + WithEndpointSliceLabels(withServiceNameLabel(IngressLabels(), ingressServiceName)), + WithEndpointSliceAddresses(readyEndpoint("127.0.0.1"))), }, WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ Object: NewBroker(brokerName, testNS, @@ -418,12 +419,12 @@ func TestReconcile(t *testing.T) { WithBrokerAnnotation(v1.AsyncHandlerAnnotation, "true")), createChannel(withChannelReady), imcConfigMap(), - NewEndpoints(filterServiceName, systemNS, - WithEndpointsLabels(FilterLabels()), - WithEndpointsAddresses(corev1.EndpointAddress{IP: "127.0.0.1"})), - NewEndpoints(ingressServiceName, systemNS, - WithEndpointsLabels(IngressLabels()), - WithEndpointsAddresses(corev1.EndpointAddress{IP: "127.0.0.1"})), + NewEndpointSlice(filterServiceName, systemNS, + WithEndpointSliceLabels(withServiceNameLabel(FilterLabels(), filterServiceName)), + WithEndpointSliceAddresses(readyEndpoint("127.0.0.1"))), + NewEndpointSlice(ingressServiceName, systemNS, + WithEndpointSliceLabels(withServiceNameLabel(IngressLabels(), ingressServiceName)), + WithEndpointSliceAddresses(readyEndpoint("127.0.0.1"))), }, WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ Object: NewBroker(brokerName, testNS, @@ -458,12 +459,12 @@ func TestReconcile(t *testing.T) { WithInitBrokerConditions), createChannel(withChannelReady, withChannelStatusCACerts(testCaCerts)), imcConfigMap(), - NewEndpoints(filterServiceName, systemNS, - WithEndpointsLabels(FilterLabels()), - WithEndpointsAddresses(corev1.EndpointAddress{IP: "127.0.0.1"})), - NewEndpoints(ingressServiceName, systemNS, - WithEndpointsLabels(IngressLabels()), - WithEndpointsAddresses(corev1.EndpointAddress{IP: "127.0.0.1"})), + NewEndpointSlice(filterServiceName, systemNS, + WithEndpointSliceLabels(withServiceNameLabel(FilterLabels(), filterServiceName)), + WithEndpointSliceAddresses(readyEndpoint("127.0.0.1"))), + NewEndpointSlice(ingressServiceName, systemNS, + WithEndpointSliceLabels(withServiceNameLabel(IngressLabels(), ingressServiceName)), + WithEndpointSliceAddresses(readyEndpoint("127.0.0.1"))), }, WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ Object: NewBroker(brokerName, testNS, @@ -489,12 +490,12 @@ func TestReconcile(t *testing.T) { WithInitBrokerConditions), createChannel(withChannelReady, withChannelStatusAudience(channelAudience)), imcConfigMap(), - NewEndpoints(filterServiceName, systemNS, - WithEndpointsLabels(FilterLabels()), - WithEndpointsAddresses(corev1.EndpointAddress{IP: "127.0.0.1"})), - NewEndpoints(ingressServiceName, systemNS, - WithEndpointsLabels(IngressLabels()), - WithEndpointsAddresses(corev1.EndpointAddress{IP: "127.0.0.1"})), + NewEndpointSlice(filterServiceName, systemNS, + WithEndpointSliceLabels(withServiceNameLabel(FilterLabels(), filterServiceName)), + WithEndpointSliceAddresses(readyEndpoint("127.0.0.1"))), + NewEndpointSlice(ingressServiceName, systemNS, + WithEndpointSliceLabels(withServiceNameLabel(IngressLabels(), ingressServiceName)), + WithEndpointSliceAddresses(readyEndpoint("127.0.0.1"))), }, WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ Object: NewBroker(brokerName, testNS, @@ -520,12 +521,12 @@ func TestReconcile(t *testing.T) { WithInitBrokerConditions), createChannel(withChannelReady), imcConfigMapLegacy(), - NewEndpoints(filterServiceName, systemNS, - WithEndpointsLabels(FilterLabels()), - WithEndpointsAddresses(corev1.EndpointAddress{IP: "127.0.0.1"})), - NewEndpoints(ingressServiceName, systemNS, - WithEndpointsLabels(IngressLabels()), - WithEndpointsAddresses(corev1.EndpointAddress{IP: "127.0.0.1"})), + NewEndpointSlice(filterServiceName, systemNS, + WithEndpointSliceLabels(withServiceNameLabel(FilterLabels(), filterServiceName)), + WithEndpointSliceAddresses(readyEndpoint("127.0.0.1"))), + NewEndpointSlice(ingressServiceName, systemNS, + WithEndpointSliceLabels(withServiceNameLabel(IngressLabels(), ingressServiceName)), + WithEndpointSliceAddresses(readyEndpoint("127.0.0.1"))), }, WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ Object: NewBroker(brokerName, testNS, @@ -550,12 +551,12 @@ func TestReconcile(t *testing.T) { WithInitBrokerConditions), createChannel(withChannelReady), imcConfigMap(), - NewEndpoints(filterServiceName, systemNS, - WithEndpointsLabels(FilterLabels()), - WithEndpointsAddresses(corev1.EndpointAddress{IP: "127.0.0.1"})), - NewEndpoints(ingressServiceName, systemNS, - WithEndpointsLabels(IngressLabels()), - WithEndpointsAddresses(corev1.EndpointAddress{IP: "127.0.0.1"})), + NewEndpointSlice(filterServiceName, systemNS, + WithEndpointSliceLabels(withServiceNameLabel(FilterLabels(), filterServiceName)), + WithEndpointSliceAddresses(readyEndpoint("127.0.0.1"))), + NewEndpointSlice(ingressServiceName, systemNS, + WithEndpointSliceLabels(withServiceNameLabel(IngressLabels(), ingressServiceName)), + WithEndpointSliceAddresses(readyEndpoint("127.0.0.1"))), }, WithReactors: []clientgotesting.ReactionFunc{ InduceFailure("update", "brokers"), @@ -588,12 +589,12 @@ func TestReconcile(t *testing.T) { WithInitBrokerConditions), createChannel(withChannelDeadLetterSink(brokerDestv1)), imcConfigMap(), - NewEndpoints(filterServiceName, systemNS, - WithEndpointsLabels(FilterLabels()), - WithEndpointsAddresses(corev1.EndpointAddress{IP: "127.0.0.1"})), - NewEndpoints(ingressServiceName, systemNS, - WithEndpointsLabels(IngressLabels()), - WithEndpointsAddresses(corev1.EndpointAddress{IP: "127.0.0.1"})), + NewEndpointSlice(filterServiceName, systemNS, + WithEndpointSliceLabels(withServiceNameLabel(FilterLabels(), filterServiceName)), + WithEndpointSliceAddresses(readyEndpoint("127.0.0.1"))), + NewEndpointSlice(ingressServiceName, systemNS, + WithEndpointSliceLabels(withServiceNameLabel(IngressLabels(), ingressServiceName)), + WithEndpointSliceAddresses(readyEndpoint("127.0.0.1"))), }, WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ Object: NewBroker(brokerName, testNS, @@ -615,12 +616,12 @@ func TestReconcile(t *testing.T) { WithInitBrokerConditions), createChannel(withChannelReady, withChannelDeadLetterSink(sinkSVCDest)), imcConfigMap(), - NewEndpoints(filterServiceName, systemNS, - WithEndpointsLabels(FilterLabels()), - WithEndpointsAddresses(corev1.EndpointAddress{IP: "127.0.0.1"})), - NewEndpoints(ingressServiceName, systemNS, - WithEndpointsLabels(IngressLabels()), - WithEndpointsAddresses(corev1.EndpointAddress{IP: "127.0.0.1"})), + NewEndpointSlice(filterServiceName, systemNS, + WithEndpointSliceLabels(withServiceNameLabel(FilterLabels(), filterServiceName)), + WithEndpointSliceAddresses(readyEndpoint("127.0.0.1"))), + NewEndpointSlice(ingressServiceName, systemNS, + WithEndpointSliceLabels(withServiceNameLabel(IngressLabels(), ingressServiceName)), + WithEndpointSliceAddresses(readyEndpoint("127.0.0.1"))), }, WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ Object: NewBroker(brokerName, testNS, @@ -649,12 +650,12 @@ func TestReconcile(t *testing.T) { WithInitBrokerConditions), createChannel(withChannelReady, withChannelDeadLetterSink(alternateDLSDest)), imcConfigMap(), - NewEndpoints(filterServiceName, systemNS, - WithEndpointsLabels(FilterLabels()), - WithEndpointsAddresses(corev1.EndpointAddress{IP: "127.0.0.1"})), - NewEndpoints(ingressServiceName, systemNS, - WithEndpointsLabels(IngressLabels()), - WithEndpointsAddresses(corev1.EndpointAddress{IP: "127.0.0.1"})), + NewEndpointSlice(filterServiceName, systemNS, + WithEndpointSliceLabels(withServiceNameLabel(FilterLabels(), filterServiceName)), + WithEndpointSliceAddresses(readyEndpoint("127.0.0.1"))), + NewEndpointSlice(ingressServiceName, systemNS, + WithEndpointSliceLabels(withServiceNameLabel(IngressLabels(), ingressServiceName)), + WithEndpointSliceAddresses(readyEndpoint("127.0.0.1"))), }, WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ Object: NewBroker(brokerName, testNS, @@ -685,12 +686,12 @@ func TestReconcile(t *testing.T) { WithInitBrokerConditions), createChannel(withChannelReady), imcConfigMap(), - NewEndpoints(filterServiceName, systemNS, - WithEndpointsLabels(FilterLabels()), - WithEndpointsAddresses(corev1.EndpointAddress{IP: "127.0.0.1"})), - NewEndpoints(ingressServiceName, systemNS, - WithEndpointsLabels(IngressLabels()), - WithEndpointsAddresses(corev1.EndpointAddress{IP: "127.0.0.1"})), + NewEndpointSlice(filterServiceName, systemNS, + WithEndpointSliceLabels(withServiceNameLabel(FilterLabels(), filterServiceName)), + WithEndpointSliceAddresses(readyEndpoint("127.0.0.1"))), + NewEndpointSlice(ingressServiceName, systemNS, + WithEndpointSliceLabels(withServiceNameLabel(IngressLabels(), ingressServiceName)), + WithEndpointSliceAddresses(readyEndpoint("127.0.0.1"))), }, WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ Object: NewBroker(brokerName, testNS, @@ -720,12 +721,12 @@ func TestReconcile(t *testing.T) { WithInitBrokerConditions), createChannel(withChannelReady, withChannelDeadLetterSink(sinkSVCDest)), imcConfigMap(), - NewEndpoints(filterServiceName, systemNS, - WithEndpointsLabels(FilterLabels()), - WithEndpointsAddresses(corev1.EndpointAddress{IP: "127.0.0.1"})), - NewEndpoints(ingressServiceName, systemNS, - WithEndpointsLabels(IngressLabels()), - WithEndpointsAddresses(corev1.EndpointAddress{IP: "127.0.0.1"})), + NewEndpointSlice(filterServiceName, systemNS, + WithEndpointSliceLabels(withServiceNameLabel(FilterLabels(), filterServiceName)), + WithEndpointSliceAddresses(readyEndpoint("127.0.0.1"))), + NewEndpointSlice(ingressServiceName, systemNS, + WithEndpointSliceLabels(withServiceNameLabel(IngressLabels(), ingressServiceName)), + WithEndpointSliceAddresses(readyEndpoint("127.0.0.1"))), makeTLSSecret(), }, WantErr: false, @@ -771,12 +772,12 @@ func TestReconcile(t *testing.T) { WithInitBrokerConditions), createChannel(withChannelReady, withChannelDeadLetterSink(sinkSVCDest)), imcConfigMap(), - NewEndpoints(filterServiceName, systemNS, - WithEndpointsLabels(FilterLabels()), - WithEndpointsAddresses(corev1.EndpointAddress{IP: "127.0.0.1"})), - NewEndpoints(ingressServiceName, systemNS, - WithEndpointsLabels(IngressLabels()), - WithEndpointsAddresses(corev1.EndpointAddress{IP: "127.0.0.1"})), + NewEndpointSlice(filterServiceName, systemNS, + WithEndpointSliceLabels(withServiceNameLabel(FilterLabels(), filterServiceName)), + WithEndpointSliceAddresses(readyEndpoint("127.0.0.1"))), + NewEndpointSlice(ingressServiceName, systemNS, + WithEndpointSliceLabels(withServiceNameLabel(IngressLabels(), ingressServiceName)), + WithEndpointSliceAddresses(readyEndpoint("127.0.0.1"))), makeTLSSecret(), }, WantErr: false, @@ -823,12 +824,12 @@ func TestReconcile(t *testing.T) { WithInitBrokerConditions), createChannel(withChannelReady, withChannelDeadLetterSink(sinkSVCDest)), imcConfigMap(), - NewEndpoints(filterServiceName, systemNS, - WithEndpointsLabels(FilterLabels()), - WithEndpointsAddresses(corev1.EndpointAddress{IP: "127.0.0.1"})), - NewEndpoints(ingressServiceName, systemNS, - WithEndpointsLabels(IngressLabels()), - WithEndpointsAddresses(corev1.EndpointAddress{IP: "127.0.0.1"})), + NewEndpointSlice(filterServiceName, systemNS, + WithEndpointSliceLabels(withServiceNameLabel(FilterLabels(), filterServiceName)), + WithEndpointSliceAddresses(readyEndpoint("127.0.0.1"))), + NewEndpointSlice(ingressServiceName, systemNS, + WithEndpointSliceLabels(withServiceNameLabel(IngressLabels(), ingressServiceName)), + WithEndpointSliceAddresses(readyEndpoint("127.0.0.1"))), }, WantErr: false, WantCreates: []runtime.Object{ @@ -869,12 +870,12 @@ func TestReconcile(t *testing.T) { WithInitBrokerConditions), createChannel(withChannelReady, withChannelDeadLetterSink(sinkSVCDest)), imcConfigMap(), - NewEndpoints(filterServiceName, systemNS, - WithEndpointsLabels(FilterLabels()), - WithEndpointsAddresses(corev1.EndpointAddress{IP: "127.0.0.1"})), - NewEndpoints(ingressServiceName, systemNS, - WithEndpointsLabels(IngressLabels()), - WithEndpointsAddresses(corev1.EndpointAddress{IP: "127.0.0.1"})), + NewEndpointSlice(filterServiceName, systemNS, + WithEndpointSliceLabels(withServiceNameLabel(FilterLabels(), filterServiceName)), + WithEndpointSliceAddresses(readyEndpoint("127.0.0.1"))), + NewEndpointSlice(ingressServiceName, systemNS, + WithEndpointSliceLabels(withServiceNameLabel(IngressLabels(), ingressServiceName)), + WithEndpointSliceAddresses(readyEndpoint("127.0.0.1"))), }, WantErr: false, WantCreates: []runtime.Object{ @@ -913,12 +914,12 @@ func TestReconcile(t *testing.T) { WithInitBrokerConditions), createChannel(withChannelReady), imcConfigMap(), - NewEndpoints(filterServiceName, systemNS, - WithEndpointsLabels(FilterLabels()), - WithEndpointsAddresses(corev1.EndpointAddress{IP: "127.0.0.1"})), - NewEndpoints(ingressServiceName, systemNS, - WithEndpointsLabels(IngressLabels()), - WithEndpointsAddresses(corev1.EndpointAddress{IP: "127.0.0.1"})), + NewEndpointSlice(filterServiceName, systemNS, + WithEndpointSliceLabels(withServiceNameLabel(FilterLabels(), filterServiceName)), + WithEndpointSliceAddresses(readyEndpoint("127.0.0.1"))), + NewEndpointSlice(ingressServiceName, systemNS, + WithEndpointSliceLabels(withServiceNameLabel(IngressLabels(), ingressServiceName)), + WithEndpointSliceAddresses(readyEndpoint("127.0.0.1"))), NewEventPolicy(readyEventPolicyName, testNS, WithReadyEventPolicyCondition, WithEventPolicyToRef(brokerV1GVK, brokerName), @@ -958,12 +959,12 @@ func TestReconcile(t *testing.T) { WithInitBrokerConditions), createChannel(withChannelReady), imcConfigMap(), - NewEndpoints(filterServiceName, systemNS, - WithEndpointsLabels(FilterLabels()), - WithEndpointsAddresses(corev1.EndpointAddress{IP: "127.0.0.1"})), - NewEndpoints(ingressServiceName, systemNS, - WithEndpointsLabels(IngressLabels()), - WithEndpointsAddresses(corev1.EndpointAddress{IP: "127.0.0.1"})), + NewEndpointSlice(filterServiceName, systemNS, + WithEndpointSliceLabels(withServiceNameLabel(FilterLabels(), filterServiceName)), + WithEndpointSliceAddresses(readyEndpoint("127.0.0.1"))), + NewEndpointSlice(ingressServiceName, systemNS, + WithEndpointSliceLabels(withServiceNameLabel(IngressLabels(), ingressServiceName)), + WithEndpointSliceAddresses(readyEndpoint("127.0.0.1"))), NewEventPolicy(unreadyEventPolicyName, testNS, WithUnreadyEventPolicyCondition("", ""), WithEventPolicyToRef(brokerV1GVK, brokerName), @@ -1002,12 +1003,12 @@ func TestReconcile(t *testing.T) { WithInitBrokerConditions), createChannel(withChannelReady), imcConfigMap(), - NewEndpoints(filterServiceName, systemNS, - WithEndpointsLabels(FilterLabels()), - WithEndpointsAddresses(corev1.EndpointAddress{IP: "127.0.0.1"})), - NewEndpoints(ingressServiceName, systemNS, - WithEndpointsLabels(IngressLabels()), - WithEndpointsAddresses(corev1.EndpointAddress{IP: "127.0.0.1"})), + NewEndpointSlice(filterServiceName, systemNS, + WithEndpointSliceLabels(withServiceNameLabel(FilterLabels(), filterServiceName)), + WithEndpointSliceAddresses(readyEndpoint("127.0.0.1"))), + NewEndpointSlice(ingressServiceName, systemNS, + WithEndpointSliceLabels(withServiceNameLabel(IngressLabels(), ingressServiceName)), + WithEndpointSliceAddresses(readyEndpoint("127.0.0.1"))), NewEventPolicy(readyEventPolicyName, testNS, WithReadyEventPolicyCondition, WithEventPolicyToRef(brokerV1GVK, brokerName), @@ -1051,12 +1052,12 @@ func TestReconcile(t *testing.T) { WithInitBrokerConditions), createChannel(withChannelReady), imcConfigMap(), - NewEndpoints(filterServiceName, systemNS, - WithEndpointsLabels(FilterLabels()), - WithEndpointsAddresses(corev1.EndpointAddress{IP: "127.0.0.1"})), - NewEndpoints(ingressServiceName, systemNS, - WithEndpointsLabels(IngressLabels()), - WithEndpointsAddresses(corev1.EndpointAddress{IP: "127.0.0.1"})), + NewEndpointSlice(filterServiceName, systemNS, + WithEndpointSliceLabels(withServiceNameLabel(FilterLabels(), filterServiceName)), + WithEndpointSliceAddresses(readyEndpoint("127.0.0.1"))), + NewEndpointSlice(ingressServiceName, systemNS, + WithEndpointSliceLabels(withServiceNameLabel(IngressLabels(), ingressServiceName)), + WithEndpointSliceAddresses(readyEndpoint("127.0.0.1"))), }, WantCreates: []runtime.Object{ makeEventPolicy(), @@ -1102,15 +1103,15 @@ func TestReconcile(t *testing.T) { } r := &Reconciler{ - eventingClientSet: fakeeventingclient.Get(ctx), - dynamicClientSet: fakedynamicclient.Get(ctx), - subscriptionLister: listers.GetSubscriptionLister(), - endpointsLister: listers.GetEndpointsLister(), - configmapLister: listers.GetConfigMapLister(), - secretLister: listers.GetSecretLister(), - channelableTracker: duck.NewListableTrackerFromTracker(ctx, channelable.Get, tracker.New(func(types.NamespacedName) {}, 0)), - uriResolver: resolver.NewURIResolverFromTracker(ctx, tracker.New(func(types.NamespacedName) {}, 0)), - eventPolicyLister: listers.GetEventPolicyLister(), + eventingClientSet: fakeeventingclient.Get(ctx), + dynamicClientSet: fakedynamicclient.Get(ctx), + subscriptionLister: listers.GetSubscriptionLister(), + endpointSliceLister: listers.GetEndpointSliceLister(), + configmapLister: listers.GetConfigMapLister(), + secretLister: listers.GetSecretLister(), + channelableTracker: duck.NewListableTrackerFromTracker(ctx, channelable.Get, tracker.New(func(types.NamespacedName) {}, 0)), + uriResolver: resolver.NewURIResolverFromTracker(ctx, tracker.New(func(types.NamespacedName) {}, 0)), + eventPolicyLister: listers.GetEventPolicyLister(), } return broker.NewReconciler(ctx, logger, fakeeventingclient.Get(ctx), listers.GetBrokerLister(), @@ -1302,6 +1303,23 @@ func IngressLabels() map[string]string { } } +func readyEndpoint(ip string) discoveryv1.Endpoint { + ready := true + return discoveryv1.Endpoint{ + Addresses: []string{ip}, + Conditions: discoveryv1.EndpointConditions{Ready: &ready}, + } +} + +func withServiceNameLabel(labels map[string]string, serviceName string) map[string]string { + merged := make(map[string]string, len(labels)+1) + for k, v := range labels { + merged[k] = v + } + merged[discoveryv1.LabelServiceName] = serviceName + return merged +} + func makeDLSServiceAsUnstructured() *unstructured.Unstructured { return &unstructured.Unstructured{ Object: map[string]interface{}{ diff --git a/pkg/reconciler/broker/controller.go b/pkg/reconciler/broker/controller.go index beb7bb08cab..64bf96a3705 100644 --- a/pkg/reconciler/broker/controller.go +++ b/pkg/reconciler/broker/controller.go @@ -45,7 +45,6 @@ import ( subscriptioninformer "knative.dev/eventing/pkg/client/injection/informers/messaging/v1/subscription" brokerreconciler "knative.dev/eventing/pkg/client/injection/reconciler/eventing/v1/broker" "knative.dev/eventing/pkg/duck" - "knative.dev/eventing/pkg/reconciler/names" ) const ( @@ -66,9 +65,9 @@ func NewController( brokerInformer := brokerinformer.Get(ctx) subscriptionInformer := subscriptioninformer.Get(ctx) - endpointsInformer := namespacedinformerfactory.Get(ctx).Core().V1().Endpoints() - if err := controller.StartInformers(ctx.Done(), endpointsInformer.Informer()); err != nil { - logger.Fatalw("Failed to start namespaced endpoints informer", zap.Error(err)) + endpointSliceInformer := namespacedinformerfactory.Get(ctx).Discovery().V1().EndpointSlices() + if err := controller.StartInformers(ctx.Done(), endpointSliceInformer.Informer()); err != nil { + logger.Fatalw("Failed to start namespaced EndpointSlice informer", zap.Error(err)) } configmapInformer := configmapinformer.Get(ctx) @@ -95,14 +94,14 @@ func NewController( brokerFilter := pkgreconciler.AnnotationFilterFunc(brokerreconciler.ClassAnnotationKey, eventing.MTChannelBrokerClassValue, false /*allowUnset*/) r := &Reconciler{ - eventingClientSet: eventingclient.Get(ctx), - dynamicClientSet: dynamicclient.Get(ctx), - endpointsLister: endpointsInformer.Lister(), - subscriptionLister: subscriptionInformer.Lister(), - brokerClass: eventing.MTChannelBrokerClassValue, - configmapLister: configmapInformer.Lister(), - secretLister: secretInformer.Lister(), - eventPolicyLister: eventPolicyInformer.Lister(), + eventingClientSet: eventingclient.Get(ctx), + dynamicClientSet: dynamicclient.Get(ctx), + endpointSliceLister: endpointSliceInformer.Lister(), + subscriptionLister: subscriptionInformer.Lister(), + brokerClass: eventing.MTChannelBrokerClassValue, + configmapLister: configmapInformer.Lister(), + secretLister: secretInformer.Lister(), + eventPolicyLister: eventPolicyInformer.Lister(), } impl := brokerreconciler.NewImpl(ctx, r, eventing.MTChannelBrokerClassValue, func(impl *controller.Impl) controller.Options { return controller.Options{ @@ -128,19 +127,10 @@ func NewController( logger.Info("Doing a global resync due to endpoint changes in shared broker component") impl.FilteredGlobalResync(brokerFilter, brokerInformer.Informer()) } - // Resync for the filter. - endpointsInformer.Informer().AddEventHandler(cache.FilteringResourceEventHandler{ - FilterFunc: pkgreconciler.ChainFilterFuncs( - pkgreconciler.NamespaceFilterFunc(system.Namespace()), - pkgreconciler.NameFilterFunc(names.BrokerFilterName)), - Handler: controller.HandleAll(globalResync), - }) - // Resync for the ingress. - endpointsInformer.Informer().AddEventHandler(cache.FilteringResourceEventHandler{ - FilterFunc: pkgreconciler.ChainFilterFuncs( - pkgreconciler.NamespaceFilterFunc(system.Namespace()), - pkgreconciler.NameFilterFunc(names.BrokerIngressName)), - Handler: controller.HandleAll(globalResync), + // Resync for changes to EndpointSlices backing filter/ingress services. + endpointSliceInformer.Informer().AddEventHandler(cache.FilteringResourceEventHandler{ + FilterFunc: pkgreconciler.NamespaceFilterFunc(system.Namespace()), + Handler: controller.HandleAll(globalResync), }) secretInformer.Informer().AddEventHandler(cache.FilteringResourceEventHandler{ FilterFunc: controller.FilterWithName(ingressServerTLSSecretName), diff --git a/pkg/reconciler/broker/trigger/trigger_test.go b/pkg/reconciler/broker/trigger/trigger_test.go index 30cf481c347..5058534d0e6 100644 --- a/pkg/reconciler/broker/trigger/trigger_test.go +++ b/pkg/reconciler/broker/trigger/trigger_test.go @@ -25,6 +25,7 @@ import ( authenticationv1 "k8s.io/api/authentication/v1" authv1 "k8s.io/api/authorization/v1" corev1 "k8s.io/api/core/v1" + discoveryv1 "k8s.io/api/discovery/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime" @@ -2046,12 +2047,40 @@ func allBrokerObjectsReadyPlus(objs ...runtime.Object) []runtime.Object { WithChannelNameAnnotation(triggerChannelName)), createChannel(testNS, true), imcConfigMap(), - NewEndpoints(filterServiceName, systemNS, - WithEndpointsLabels(FilterLabels()), - WithEndpointsAddresses(corev1.EndpointAddress{IP: "127.0.0.1"})), - NewEndpoints(ingressServiceName, systemNS, - WithEndpointsLabels(IngressLabels()), - WithEndpointsAddresses(corev1.EndpointAddress{IP: "127.0.0.1"})), + &discoveryv1.EndpointSlice{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: systemNS, + Name: filterServiceName + "-abc12", + Labels: map[string]string{ + discoveryv1.LabelServiceName: filterServiceName, + "eventing.knative.dev/brokerRole": "filter", + }, + }, + AddressType: discoveryv1.AddressTypeIPv4, + Endpoints: []discoveryv1.Endpoint{ + { + Addresses: []string{"127.0.0.1"}, + Conditions: discoveryv1.EndpointConditions{Ready: ptr.Bool(true)}, + }, + }, + }, + &discoveryv1.EndpointSlice{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: systemNS, + Name: ingressServiceName + "-abc12", + Labels: map[string]string{ + discoveryv1.LabelServiceName: ingressServiceName, + "eventing.knative.dev/brokerRole": "ingress", + }, + }, + AddressType: discoveryv1.AddressTypeIPv4, + Endpoints: []discoveryv1.Endpoint{ + { + Addresses: []string{"127.0.0.1"}, + Conditions: discoveryv1.EndpointConditions{Ready: ptr.Bool(true)}, + }, + }, + }, } return append(brokerObjs[:], objs...) } diff --git a/pkg/reconciler/eventtransform/controller.go b/pkg/reconciler/eventtransform/controller.go index 17cbf2b6c10..3e546e2bf04 100644 --- a/pkg/reconciler/eventtransform/controller.go +++ b/pkg/reconciler/eventtransform/controller.go @@ -74,15 +74,14 @@ func NewController( eventPolicyInformer := eventpolicyinformer.Get(ctx) rolebindingInformer := rolebindinginformer.Get(ctx) - // Create a custom informer as one in knative/pkg doesn't exist for endpoints. - jsonataEndpointFactory := informers.NewSharedInformerFactoryWithOptions( + jsonataEndpointSliceFactory := informers.NewSharedInformerFactoryWithOptions( kubeclient.Get(ctx), controller.DefaultResyncPeriod, informers.WithTweakListOptions(func(options *metav1.ListOptions) { options.LabelSelector = JsonataResourcesSelector }), ) - jsonataEndpointInformer := jsonataEndpointFactory.Core().V1().Endpoints() + jsonataEndpointSliceInformer := jsonataEndpointSliceFactory.Discovery().V1().EndpointSlices() env := &envConfig{} if err := envconfig.Process("", env); err != nil { @@ -115,7 +114,7 @@ func NewController( jsonataConfigMapLister: jsonataConfigMapInformer.Lister(), jsonataDeploymentsLister: jsonataDeploymentInformer.Lister(), jsonataServiceLister: jsonataServiceInformer.Lister(), - jsonataEndpointLister: jsonataEndpointInformer.Lister(), + jsonataEndpointSliceLister: jsonataEndpointSliceInformer.Lister(), jsonataSinkBindingLister: jsonataSinkBindingInformer.Lister(), cmCertificateLister: dynamicCertificatesInformer.Lister(), certificatesSecretLister: certificatesSecretInformer.Lister(), @@ -142,7 +141,7 @@ func NewController( jsonataDeploymentInformer.Informer().AddEventHandler(controller.HandleAll(enqueueUsingNameLabel(impl))) jsonataServiceInformer.Informer().AddEventHandler(controller.HandleAll(enqueueUsingNameLabel(impl))) - jsonataEndpointInformer.Informer().AddEventHandler(controller.HandleAll(enqueueUsingNameLabel(impl))) + jsonataEndpointSliceInformer.Informer().AddEventHandler(controller.HandleAll(enqueueUsingNameLabel(impl))) jsonataConfigMapInformer.Informer().AddEventHandler(controller.HandleAll(enqueueUsingNameLabel(impl))) jsonataSinkBindingInformer.Informer().AddEventHandler(controller.HandleAll(enqueueUsingNameLabel(impl))) @@ -154,8 +153,8 @@ func NewController( )) // Start the factory after creating all necessary informers. - jsonataEndpointFactory.Start(ctx.Done()) - jsonataEndpointFactory.WaitForCacheSync(ctx.Done()) + jsonataEndpointSliceFactory.Start(ctx.Done()) + jsonataEndpointSliceFactory.WaitForCacheSync(ctx.Done()) return impl } diff --git a/pkg/reconciler/eventtransform/eventtransform.go b/pkg/reconciler/eventtransform/eventtransform.go index f67f3301ac9..1f0647835cc 100644 --- a/pkg/reconciler/eventtransform/eventtransform.go +++ b/pkg/reconciler/eventtransform/eventtransform.go @@ -28,6 +28,7 @@ import ( "go.uber.org/zap/zapcore" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" + discoveryv1 "k8s.io/api/discovery/v1" rbacv1 "k8s.io/api/rbac/v1" "k8s.io/apimachinery/pkg/api/equality" apierrors "k8s.io/apimachinery/pkg/api/errors" @@ -36,6 +37,7 @@ import ( "k8s.io/client-go/kubernetes" appslister "k8s.io/client-go/listers/apps/v1" corelister "k8s.io/client-go/listers/core/v1" + discoveryv1listers "k8s.io/client-go/listers/discovery/v1" rbacv1listers "k8s.io/client-go/listers/rbac/v1" "knative.dev/eventing/pkg/apis/feature" "knative.dev/eventing/pkg/auth" @@ -65,7 +67,7 @@ type Reconciler struct { jsonataConfigMapLister corelister.ConfigMapLister jsonataDeploymentsLister appslister.DeploymentLister jsonataServiceLister corelister.ServiceLister - jsonataEndpointLister corelister.EndpointsLister + jsonataEndpointSliceLister discoveryv1listers.EndpointSliceLister jsonataSinkBindingLister sourceslisters.SinkBindingLister cmCertificateLister *atomic.Pointer[cmlisters.CertificateLister] certificatesSecretLister corelister.SecretLister @@ -329,15 +331,31 @@ func (r *Reconciler) reconcileJsonataTransformationSinkBinding(ctx context.Conte func (r *Reconciler) reconcileJsonataTransformationAddress(ctx context.Context, transform *eventing.EventTransform) error { service := jsonataService(ctx, transform) - endpoint, err := r.jsonataEndpointLister.Endpoints(transform.GetNamespace()).Get(service.GetName()) - if apierrors.IsNotFound(err) { + epSlices, err := r.jsonataEndpointSliceLister.EndpointSlices(transform.GetNamespace()).List(labels.SelectorFromSet(labels.Set{ + discoveryv1.LabelServiceName: service.GetName(), + })) + if err != nil { + return fmt.Errorf("failed to list jsonata EndpointSlices: %w", err) + } + if len(epSlices) == 0 { transform.Status.MarkWaitingForServiceEndpoints() return controller.NewSkipKey("") } - if err != nil { - return fmt.Errorf("failed to list jsonata endpoints: %w", err) + + hasReadyEndpoint := false + hasPort := false + for _, eps := range epSlices { + if len(eps.Ports) > 0 { + hasPort = true + } + for _, ep := range eps.Endpoints { + if ep.Conditions.Ready == nil || *ep.Conditions.Ready { + hasReadyEndpoint = true + break + } + } } - if len(endpoint.Subsets) == 0 || len(endpoint.Subsets[0].Ports) == 0 || len(endpoint.Subsets[0].Addresses) == 0 { + if !hasReadyEndpoint || !hasPort { transform.Status.MarkWaitingForServiceEndpoints() return controller.NewSkipKey("") } @@ -347,9 +365,9 @@ func (r *Reconciler) reconcileJsonataTransformationAddress(ctx context.Context, if f.IsOIDCAuthentication() { expectedPort = 3129 } - for _, sub := range endpoint.Subsets { - for _, p := range sub.Ports { - if p.Port != expectedPort { + for _, eps := range epSlices { + for _, p := range eps.Ports { + if p.Port == nil || *p.Port != expectedPort { transform.Status.MarkWaitingForServiceEndpoints() return controller.NewSkipKey("") } diff --git a/pkg/reconciler/eventtransform/eventtransform_test.go b/pkg/reconciler/eventtransform/eventtransform_test.go index cec039cda7c..902cf038889 100644 --- a/pkg/reconciler/eventtransform/eventtransform_test.go +++ b/pkg/reconciler/eventtransform/eventtransform_test.go @@ -28,6 +28,7 @@ import ( cmlisters "github.com/cert-manager/cert-manager/pkg/client/listers/certmanager/v1" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" + discoveryv1 "k8s.io/api/discovery/v1" rbacv1 "k8s.io/api/rbac/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" @@ -209,18 +210,18 @@ func TestReconcile(t *testing.T) { UnavailableReplicas: 0, } }), - &corev1.Endpoints{ + &discoveryv1.EndpointSlice{ ObjectMeta: metav1.ObjectMeta{ Namespace: jsonataTestService(ctx).Namespace, - Name: jsonataTestService(ctx).Name, + Name: jsonataTestService(ctx).Name + "-abc12", + Labels: map[string]string{ + discoveryv1.LabelServiceName: jsonataTestService(ctx).Name, + }, }, - Subsets: []corev1.EndpointSubset{ + AddressType: discoveryv1.AddressTypeIPv4, + Ports: []discoveryv1.EndpointPort{ { - Ports: []corev1.EndpointPort{ - { - Port: 8080, - }, - }, + Port: ptr.Int32(8080), }, }, }, @@ -268,23 +269,24 @@ func TestReconcile(t *testing.T) { UnavailableReplicas: 0, } }), - &corev1.Endpoints{ + &discoveryv1.EndpointSlice{ ObjectMeta: metav1.ObjectMeta{ Namespace: jsonataTestService(ctx).Namespace, - Name: jsonataTestService(ctx).Name, + Name: jsonataTestService(ctx).Name + "-abc12", + Labels: map[string]string{ + discoveryv1.LabelServiceName: jsonataTestService(ctx).Name, + }, }, - Subsets: []corev1.EndpointSubset{ + AddressType: discoveryv1.AddressTypeIPv4, + Ports: []discoveryv1.EndpointPort{ { - Ports: []corev1.EndpointPort{ - { - Port: 8080, - }, - }, - Addresses: []corev1.EndpointAddress{ - { - IP: "192.168.0.1", - }, - }, + Port: ptr.Int32(8080), + }, + }, + Endpoints: []discoveryv1.Endpoint{ + { + Addresses: []string{"192.168.0.1"}, + Conditions: discoveryv1.EndpointConditions{Ready: ptr.Bool(true)}, }, }, }, @@ -338,23 +340,24 @@ func TestReconcile(t *testing.T) { UnavailableReplicas: 0, } }), - &corev1.Endpoints{ + &discoveryv1.EndpointSlice{ ObjectMeta: metav1.ObjectMeta{ Namespace: jsonataTestService(ctx).Namespace, - Name: jsonataTestService(ctx).Name, + Name: jsonataTestService(ctx).Name + "-abc12", + Labels: map[string]string{ + discoveryv1.LabelServiceName: jsonataTestService(ctx).Name, + }, }, - Subsets: []corev1.EndpointSubset{ + AddressType: discoveryv1.AddressTypeIPv4, + Ports: []discoveryv1.EndpointPort{ { - Ports: []corev1.EndpointPort{ - { - Port: 8080, - }, - }, - Addresses: []corev1.EndpointAddress{ - { - IP: "192.168.0.1", - }, - }, + Port: ptr.Int32(8080), + }, + }, + Endpoints: []discoveryv1.Endpoint{ + { + Addresses: []string{"192.168.0.1"}, + Conditions: discoveryv1.EndpointConditions{Ready: ptr.Bool(true)}, }, }, }, @@ -408,23 +411,24 @@ func TestReconcile(t *testing.T) { UnavailableReplicas: 0, } }), - &corev1.Endpoints{ + &discoveryv1.EndpointSlice{ ObjectMeta: metav1.ObjectMeta{ Namespace: jsonataTestService(ctx).Namespace, - Name: jsonataTestService(ctx).Name, + Name: jsonataTestService(ctx).Name + "-abc12", + Labels: map[string]string{ + discoveryv1.LabelServiceName: jsonataTestService(ctx).Name, + }, }, - Subsets: []corev1.EndpointSubset{ + AddressType: discoveryv1.AddressTypeIPv4, + Ports: []discoveryv1.EndpointPort{ { - Ports: []corev1.EndpointPort{ - { - Port: 8080, - }, - }, - Addresses: []corev1.EndpointAddress{ - { - IP: "192.168.0.1", - }, - }, + Port: ptr.Int32(8080), + }, + }, + Endpoints: []discoveryv1.Endpoint{ + { + Addresses: []string{"192.168.0.1"}, + Conditions: discoveryv1.EndpointConditions{Ready: ptr.Bool(true)}, }, }, }, @@ -479,23 +483,24 @@ func TestReconcile(t *testing.T) { UnavailableReplicas: 0, } }), - &corev1.Endpoints{ + &discoveryv1.EndpointSlice{ ObjectMeta: metav1.ObjectMeta{ Namespace: jsonataTestService(ctx).Namespace, - Name: jsonataTestService(ctx).Name, + Name: jsonataTestService(ctx).Name + "-abc12", + Labels: map[string]string{ + discoveryv1.LabelServiceName: jsonataTestService(ctx).Name, + }, }, - Subsets: []corev1.EndpointSubset{ + AddressType: discoveryv1.AddressTypeIPv4, + Ports: []discoveryv1.EndpointPort{ { - Ports: []corev1.EndpointPort{ - { - Port: 8080, - }, - }, - Addresses: []corev1.EndpointAddress{ - { - IP: "192.168.0.1", - }, - }, + Port: ptr.Int32(8080), + }, + }, + Endpoints: []discoveryv1.Endpoint{ + { + Addresses: []string{"192.168.0.1"}, + Conditions: discoveryv1.EndpointConditions{Ready: ptr.Bool(true)}, }, }, }, @@ -553,23 +558,24 @@ func TestReconcile(t *testing.T) { UnavailableReplicas: 0, } }), - &corev1.Endpoints{ + &discoveryv1.EndpointSlice{ ObjectMeta: metav1.ObjectMeta{ Namespace: jsonataTestService(ctx).Namespace, - Name: jsonataTestService(ctx).Name, + Name: jsonataTestService(ctx).Name + "-abc12", + Labels: map[string]string{ + discoveryv1.LabelServiceName: jsonataTestService(ctx).Name, + }, }, - Subsets: []corev1.EndpointSubset{ + AddressType: discoveryv1.AddressTypeIPv4, + Ports: []discoveryv1.EndpointPort{ { - Ports: []corev1.EndpointPort{ - { - Port: 8080, - }, - }, - Addresses: []corev1.EndpointAddress{ - { - IP: "192.168.0.1", - }, - }, + Port: ptr.Int32(8080), + }, + }, + Endpoints: []discoveryv1.Endpoint{ + { + Addresses: []string{"192.168.0.1"}, + Conditions: discoveryv1.EndpointConditions{Ready: ptr.Bool(true)}, }, }, }, @@ -617,23 +623,24 @@ func TestReconcile(t *testing.T) { UnavailableReplicas: 0, } }), - &corev1.Endpoints{ + &discoveryv1.EndpointSlice{ ObjectMeta: metav1.ObjectMeta{ Namespace: jsonataTestService(ctx).Namespace, - Name: jsonataTestService(ctx).Name, + Name: jsonataTestService(ctx).Name + "-abc12", + Labels: map[string]string{ + discoveryv1.LabelServiceName: jsonataTestService(ctx).Name, + }, }, - Subsets: []corev1.EndpointSubset{ + AddressType: discoveryv1.AddressTypeIPv4, + Ports: []discoveryv1.EndpointPort{ { - Ports: []corev1.EndpointPort{ - { - Port: 8080, - }, - }, - Addresses: []corev1.EndpointAddress{ - { - IP: "192.168.0.1", - }, - }, + Port: ptr.Int32(8080), + }, + }, + Endpoints: []discoveryv1.Endpoint{ + { + Addresses: []string{"192.168.0.1"}, + Conditions: discoveryv1.EndpointConditions{Ready: ptr.Bool(true)}, }, }, }, @@ -682,23 +689,24 @@ func TestReconcile(t *testing.T) { UnavailableReplicas: 0, } }), - &corev1.Endpoints{ + &discoveryv1.EndpointSlice{ ObjectMeta: metav1.ObjectMeta{ Namespace: jsonataTestService(ctx).Namespace, - Name: jsonataTestService(ctx).Name, + Name: jsonataTestService(ctx).Name + "-abc12", + Labels: map[string]string{ + discoveryv1.LabelServiceName: jsonataTestService(ctx).Name, + }, }, - Subsets: []corev1.EndpointSubset{ + AddressType: discoveryv1.AddressTypeIPv4, + Ports: []discoveryv1.EndpointPort{ { - Ports: []corev1.EndpointPort{ - { - Port: 8080, - }, - }, - Addresses: []corev1.EndpointAddress{ - { - IP: "192.168.0.1", - }, - }, + Port: ptr.Int32(8080), + }, + }, + Endpoints: []discoveryv1.Endpoint{ + { + Addresses: []string{"192.168.0.1"}, + Conditions: discoveryv1.EndpointConditions{Ready: ptr.Bool(true)}, }, }, }, @@ -750,23 +758,24 @@ func TestReconcile(t *testing.T) { UnavailableReplicas: 0, } }), - &corev1.Endpoints{ + &discoveryv1.EndpointSlice{ ObjectMeta: metav1.ObjectMeta{ Namespace: jsonataTestService(ctx).Namespace, - Name: jsonataTestService(ctx).Name, + Name: jsonataTestService(ctx).Name + "-abc12", + Labels: map[string]string{ + discoveryv1.LabelServiceName: jsonataTestService(ctx).Name, + }, }, - Subsets: []corev1.EndpointSubset{ + AddressType: discoveryv1.AddressTypeIPv4, + Ports: []discoveryv1.EndpointPort{ { - Ports: []corev1.EndpointPort{ - { - Port: 8080, - }, - }, - Addresses: []corev1.EndpointAddress{ - { - IP: "192.168.0.1", - }, - }, + Port: ptr.Int32(8080), + }, + }, + Endpoints: []discoveryv1.Endpoint{ + { + Addresses: []string{"192.168.0.1"}, + Conditions: discoveryv1.EndpointConditions{Ready: ptr.Bool(true)}, }, }, }, @@ -832,23 +841,24 @@ func TestReconcile(t *testing.T) { UnavailableReplicas: 0, } }), - &corev1.Endpoints{ + &discoveryv1.EndpointSlice{ ObjectMeta: metav1.ObjectMeta{ Namespace: jsonataTestService(ctx).Namespace, - Name: jsonataTestService(ctx).Name, + Name: jsonataTestService(ctx).Name + "-abc12", + Labels: map[string]string{ + discoveryv1.LabelServiceName: jsonataTestService(ctx).Name, + }, }, - Subsets: []corev1.EndpointSubset{ + AddressType: discoveryv1.AddressTypeIPv4, + Ports: []discoveryv1.EndpointPort{ { - Ports: []corev1.EndpointPort{ - { - Port: 8080, - }, - }, - Addresses: []corev1.EndpointAddress{ - { - IP: "192.168.0.1", - }, - }, + Port: ptr.Int32(8080), + }, + }, + Endpoints: []discoveryv1.Endpoint{ + { + Addresses: []string{"192.168.0.1"}, + Conditions: discoveryv1.EndpointConditions{Ready: ptr.Bool(true)}, }, }, }, @@ -919,23 +929,24 @@ func TestReconcile(t *testing.T) { UnavailableReplicas: 0, } }), - &corev1.Endpoints{ + &discoveryv1.EndpointSlice{ ObjectMeta: metav1.ObjectMeta{ Namespace: jsonataTestService(ctx).Namespace, - Name: jsonataTestService(ctx).Name, + Name: jsonataTestService(ctx).Name + "-abc12", + Labels: map[string]string{ + discoveryv1.LabelServiceName: jsonataTestService(ctx).Name, + }, }, - Subsets: []corev1.EndpointSubset{ + AddressType: discoveryv1.AddressTypeIPv4, + Ports: []discoveryv1.EndpointPort{ { - Ports: []corev1.EndpointPort{ - { - Port: 8080, - }, - }, - Addresses: []corev1.EndpointAddress{ - { - IP: "192.168.0.1", - }, - }, + Port: ptr.Int32(8080), + }, + }, + Endpoints: []discoveryv1.Endpoint{ + { + Addresses: []string{"192.168.0.1"}, + Conditions: discoveryv1.EndpointConditions{Ready: ptr.Bool(true)}, }, }, }, @@ -1001,23 +1012,24 @@ func TestReconcile(t *testing.T) { } }), jsonataReplyExpressionTestConfigMap(ctx), - &corev1.Endpoints{ + &discoveryv1.EndpointSlice{ ObjectMeta: metav1.ObjectMeta{ Namespace: jsonataTestService(ctx).Namespace, - Name: jsonataTestService(ctx).Name, + Name: jsonataTestService(ctx).Name + "-abc12", + Labels: map[string]string{ + discoveryv1.LabelServiceName: jsonataTestService(ctx).Name, + }, }, - Subsets: []corev1.EndpointSubset{ + AddressType: discoveryv1.AddressTypeIPv4, + Ports: []discoveryv1.EndpointPort{ { - Ports: []corev1.EndpointPort{ - { - Port: 8080, - }, - }, - Addresses: []corev1.EndpointAddress{ - { - IP: "192.168.0.1", - }, - }, + Port: ptr.Int32(8080), + }, + }, + Endpoints: []discoveryv1.Endpoint{ + { + Addresses: []string{"192.168.0.1"}, + Conditions: discoveryv1.EndpointConditions{Ready: ptr.Bool(true)}, }, }, }, @@ -1076,23 +1088,24 @@ func TestReconcile(t *testing.T) { WithEventTransformSink(sink), ), jsonataExpressionTestConfigMap(ctx), - &corev1.Endpoints{ + &discoveryv1.EndpointSlice{ ObjectMeta: metav1.ObjectMeta{ Namespace: jsonataTestService(ctx).Namespace, - Name: jsonataTestService(ctx).Name, + Name: jsonataTestService(ctx).Name + "-abc12", + Labels: map[string]string{ + discoveryv1.LabelServiceName: jsonataTestService(ctx).Name, + }, }, - Subsets: []corev1.EndpointSubset{ + AddressType: discoveryv1.AddressTypeIPv4, + Ports: []discoveryv1.EndpointPort{ { - Ports: []corev1.EndpointPort{ - { - Port: 8080, - }, - }, - Addresses: []corev1.EndpointAddress{ - { - IP: "192.168.0.1", - }, - }, + Port: ptr.Int32(8080), + }, + }, + Endpoints: []discoveryv1.Endpoint{ + { + Addresses: []string{"192.168.0.1"}, + Conditions: discoveryv1.EndpointConditions{Ready: ptr.Bool(true)}, }, }, }, @@ -1154,23 +1167,24 @@ func TestReconcile(t *testing.T) { } }), jsonataExpressionTestConfigMap(ctx), - &corev1.Endpoints{ + &discoveryv1.EndpointSlice{ ObjectMeta: metav1.ObjectMeta{ Namespace: jsonataTestService(ctx).Namespace, - Name: jsonataTestService(ctx).Name, + Name: jsonataTestService(ctx).Name + "-abc12", + Labels: map[string]string{ + discoveryv1.LabelServiceName: jsonataTestService(ctx).Name, + }, }, - Subsets: []corev1.EndpointSubset{ + AddressType: discoveryv1.AddressTypeIPv4, + Ports: []discoveryv1.EndpointPort{ { - Ports: []corev1.EndpointPort{ - { - Port: 8080, - }, - }, - Addresses: []corev1.EndpointAddress{ - { - IP: "192.168.0.1", - }, - }, + Port: ptr.Int32(8080), + }, + }, + Endpoints: []discoveryv1.Endpoint{ + { + Addresses: []string{"192.168.0.1"}, + Conditions: discoveryv1.EndpointConditions{Ready: ptr.Bool(true)}, }, }, }, @@ -1236,23 +1250,24 @@ func TestReconcile(t *testing.T) { UnavailableReplicas: 0, } }), - &corev1.Endpoints{ + &discoveryv1.EndpointSlice{ ObjectMeta: metav1.ObjectMeta{ Namespace: jsonataTestService(ctx).Namespace, - Name: jsonataTestService(ctx).Name, + Name: jsonataTestService(ctx).Name + "-abc12", + Labels: map[string]string{ + discoveryv1.LabelServiceName: jsonataTestService(ctx).Name, + }, }, - Subsets: []corev1.EndpointSubset{ + AddressType: discoveryv1.AddressTypeIPv4, + Ports: []discoveryv1.EndpointPort{ { - Ports: []corev1.EndpointPort{ - { - Port: 8443, - }, - }, - Addresses: []corev1.EndpointAddress{ - { - IP: "192.168.0.1", - }, - }, + Port: ptr.Int32(8443), + }, + }, + Endpoints: []discoveryv1.Endpoint{ + { + Addresses: []string{"192.168.0.1"}, + Conditions: discoveryv1.EndpointConditions{Ready: ptr.Bool(true)}, }, }, }, @@ -1353,23 +1368,24 @@ func TestReconcile(t *testing.T) { d.Spec.Template.Annotations[JsonataCertificateRevisionKey] = "1" }), - &corev1.Endpoints{ + &discoveryv1.EndpointSlice{ ObjectMeta: metav1.ObjectMeta{ Namespace: jsonataTestService(ctx).Namespace, - Name: jsonataTestService(ctx).Name, + Name: jsonataTestService(ctx).Name + "-abc12", + Labels: map[string]string{ + discoveryv1.LabelServiceName: jsonataTestService(ctx).Name, + }, }, - Subsets: []corev1.EndpointSubset{ + AddressType: discoveryv1.AddressTypeIPv4, + Ports: []discoveryv1.EndpointPort{ { - Ports: []corev1.EndpointPort{ - { - Port: 8443, - }, - }, - Addresses: []corev1.EndpointAddress{ - { - IP: "192.168.0.1", - }, - }, + Port: ptr.Int32(8443), + }, + }, + Endpoints: []discoveryv1.Endpoint{ + { + Addresses: []string{"192.168.0.1"}, + Conditions: discoveryv1.EndpointConditions{Ready: ptr.Bool(true)}, }, }, }, @@ -1449,26 +1465,27 @@ func TestReconcile(t *testing.T) { UnavailableReplicas: 0, } }), - &corev1.Endpoints{ + &discoveryv1.EndpointSlice{ ObjectMeta: metav1.ObjectMeta{ Namespace: jsonataTestService(ctx).Namespace, - Name: jsonataTestService(ctx).Name, + Name: jsonataTestService(ctx).Name + "-abc12", + Labels: map[string]string{ + discoveryv1.LabelServiceName: jsonataTestService(ctx).Name, + }, }, - Subsets: []corev1.EndpointSubset{ + AddressType: discoveryv1.AddressTypeIPv4, + Ports: []discoveryv1.EndpointPort{ { - Ports: []corev1.EndpointPort{ - { - Port: 80, - }, - { - Port: 443, - }, - }, - Addresses: []corev1.EndpointAddress{ - { - IP: "192.168.0.1", - }, - }, + Port: ptr.Int32(80), + }, + { + Port: ptr.Int32(443), + }, + }, + Endpoints: []discoveryv1.Endpoint{ + { + Addresses: []string{"192.168.0.1"}, + Conditions: discoveryv1.EndpointConditions{Ready: ptr.Bool(true)}, }, }, }, @@ -1567,26 +1584,27 @@ func TestReconcile(t *testing.T) { d.Spec.Template.Annotations[JsonataCertificateRevisionKey] = "1" }), - &corev1.Endpoints{ + &discoveryv1.EndpointSlice{ ObjectMeta: metav1.ObjectMeta{ Namespace: jsonataTestService(ctx).Namespace, - Name: jsonataTestService(ctx).Name, + Name: jsonataTestService(ctx).Name + "-abc12", + Labels: map[string]string{ + discoveryv1.LabelServiceName: jsonataTestService(ctx).Name, + }, }, - Subsets: []corev1.EndpointSubset{ + AddressType: discoveryv1.AddressTypeIPv4, + Ports: []discoveryv1.EndpointPort{ { - Ports: []corev1.EndpointPort{ - { - Port: 8080, - }, - { - Port: 8443, - }, - }, - Addresses: []corev1.EndpointAddress{ - { - IP: "192.168.0.1", - }, - }, + Port: ptr.Int32(8080), + }, + { + Port: ptr.Int32(8443), + }, + }, + Endpoints: []discoveryv1.Endpoint{ + { + Addresses: []string{"192.168.0.1"}, + Conditions: discoveryv1.EndpointConditions{Ready: ptr.Bool(true)}, }, }, }, @@ -1818,26 +1836,27 @@ func TestReconcile(t *testing.T) { UnavailableReplicas: 0, } }), - &corev1.Endpoints{ + &discoveryv1.EndpointSlice{ ObjectMeta: metav1.ObjectMeta{ Namespace: jsonataTestService(ctx).Namespace, - Name: jsonataTestService(ctx).Name, + Name: jsonataTestService(ctx).Name + "-abc12", + Labels: map[string]string{ + discoveryv1.LabelServiceName: jsonataTestService(ctx).Name, + }, }, - Subsets: []corev1.EndpointSubset{ + AddressType: discoveryv1.AddressTypeIPv4, + Ports: []discoveryv1.EndpointPort{ { - Ports: []corev1.EndpointPort{ - { - Port: 8080, - }, - { - Port: 8443, - }, - }, - Addresses: []corev1.EndpointAddress{ - { - IP: "192.168.0.1", - }, - }, + Port: ptr.Int32(8080), + }, + { + Port: ptr.Int32(8443), + }, + }, + Endpoints: []discoveryv1.Endpoint{ + { + Addresses: []string{"192.168.0.1"}, + Conditions: discoveryv1.EndpointConditions{Ready: ptr.Bool(true)}, }, }, }, @@ -1959,23 +1978,24 @@ func TestReconcile(t *testing.T) { d.Spec.Template.Annotations[JsonataCertificateRevisionKey] = "1" }), - &corev1.Endpoints{ + &discoveryv1.EndpointSlice{ ObjectMeta: metav1.ObjectMeta{ Namespace: jsonataTestService(ctx).Namespace, - Name: jsonataTestService(ctx).Name, + Name: jsonataTestService(ctx).Name + "-abc12", + Labels: map[string]string{ + discoveryv1.LabelServiceName: jsonataTestService(ctx).Name, + }, }, - Subsets: []corev1.EndpointSubset{ + AddressType: discoveryv1.AddressTypeIPv4, + Ports: []discoveryv1.EndpointPort{ { - Ports: []corev1.EndpointPort{ - { - Port: 80, - }, - }, - Addresses: []corev1.EndpointAddress{ - { - IP: "192.168.0.1", - }, - }, + Port: ptr.Int32(80), + }, + }, + Endpoints: []discoveryv1.Endpoint{ + { + Addresses: []string{"192.168.0.1"}, + Conditions: discoveryv1.EndpointConditions{Ready: ptr.Bool(true)}, }, }, }, @@ -2050,15 +2070,24 @@ func TestReconcile(t *testing.T) { UnavailableReplicas: 0, } }), - &corev1.Endpoints{ + &discoveryv1.EndpointSlice{ ObjectMeta: metav1.ObjectMeta{ Namespace: testNS, - Name: fmt.Sprintf("%s-%s", testName, "jsonata"), + Name: fmt.Sprintf("%s-%s", testName, "jsonata") + "-abc12", + Labels: map[string]string{ + discoveryv1.LabelServiceName: fmt.Sprintf("%s-%s", testName, "jsonata"), + }, + }, + AddressType: discoveryv1.AddressTypeIPv4, + Ports: []discoveryv1.EndpointPort{ + { + Port: ptr.Int32(3128), + }, }, - Subsets: []corev1.EndpointSubset{ + Endpoints: []discoveryv1.Endpoint{ { - Ports: []corev1.EndpointPort{{Port: 3128}}, - Addresses: []corev1.EndpointAddress{{IP: "192.168.0.1"}}, + Addresses: []string{"192.168.0.1"}, + Conditions: discoveryv1.EndpointConditions{Ready: ptr.Bool(true)}, }, }, }, @@ -2117,7 +2146,7 @@ func TestReconcile(t *testing.T) { jsonataConfigMapLister: listers.GetConfigMapLister(), jsonataDeploymentsLister: listers.GetDeploymentLister(), jsonataServiceLister: listers.GetServiceLister(), - jsonataEndpointLister: listers.GetEndpointsLister(), + jsonataEndpointSliceLister: listers.GetEndpointSliceLister(), jsonataSinkBindingLister: listers.GetSinkBindingLister(), cmCertificateLister: cmCertificatesListerAtomic, certificatesSecretLister: listers.GetSecretLister(), diff --git a/pkg/reconciler/inmemorychannel/controller/controller.go b/pkg/reconciler/inmemorychannel/controller/controller.go index b5405b0a9ed..2ec7f738361 100644 --- a/pkg/reconciler/inmemorychannel/controller/controller.go +++ b/pkg/reconciler/inmemorychannel/controller/controller.go @@ -22,11 +22,13 @@ import ( "knative.dev/eventing/pkg/auth" "github.com/kelseyhightower/envconfig" + discoveryv1 "k8s.io/api/discovery/v1" "k8s.io/client-go/tools/cache" messagingv1 "knative.dev/eventing/pkg/apis/messaging/v1" kubeclient "knative.dev/pkg/client/injection/kube/client" "knative.dev/pkg/configmap" "knative.dev/pkg/controller" + "knative.dev/pkg/kmeta" "knative.dev/pkg/logging" "knative.dev/pkg/system" @@ -40,9 +42,9 @@ import ( "knative.dev/eventing/pkg/reconciler/inmemorychannel/controller/config" "knative.dev/pkg/client/injection/kube/informers/apps/v1/deployment" - "knative.dev/pkg/client/injection/kube/informers/core/v1/endpoints" "knative.dev/pkg/client/injection/kube/informers/core/v1/service" "knative.dev/pkg/client/injection/kube/informers/core/v1/serviceaccount" + "knative.dev/pkg/client/injection/kube/informers/discovery/v1/endpointslice" "knative.dev/pkg/client/injection/kube/informers/rbac/v1/rolebinding" secretinformer "knative.dev/pkg/injection/clients/namespacedkube/informers/core/v1/secret" ) @@ -64,7 +66,7 @@ func NewController( inmemorychannelInformer := inmemorychannel.Get(ctx) deploymentInformer := deployment.Get(ctx) serviceInformer := service.Get(ctx) - endpointsInformer := endpoints.Get(ctx) + endpointSliceInformer := endpointslice.Get(ctx) serviceAccountInformer := serviceaccount.Get(ctx) roleBindingInformer := rolebinding.Get(ctx) secretInformer := secretinformer.Get(ctx) @@ -75,7 +77,7 @@ func NewController( systemNamespace: system.Namespace(), deploymentLister: deploymentInformer.Lister(), serviceLister: serviceInformer.Lister(), - endpointsLister: endpointsInformer.Lister(), + endpointSliceLister: endpointSliceInformer.Lister(), serviceAccountLister: serviceAccountInformer.Lister(), roleBindingLister: roleBindingInformer.Lister(), secretLister: secretInformer.Lister(), @@ -128,8 +130,8 @@ func NewController( FilterFunc: controller.FilterWithName(dispatcherName), Handler: controller.HandleAll(globalResync), }) - endpointsInformer.Informer().AddEventHandler(cache.FilteringResourceEventHandler{ - FilterFunc: controller.FilterWithName(dispatcherName), + endpointSliceInformer.Informer().AddEventHandler(cache.FilteringResourceEventHandler{ + FilterFunc: endpointSliceServiceNameFilter(dispatcherName), Handler: controller.HandleAll(globalResync), }) serviceAccountInformer.Informer().AddEventHandler(cache.FilteringResourceEventHandler{ @@ -158,3 +160,13 @@ func NewController( return impl } + +func endpointSliceServiceNameFilter(serviceName string) func(obj interface{}) bool { + return func(obj interface{}) bool { + acc, err := kmeta.DeletionHandlingAccessor(obj) + if err != nil { + return false + } + return acc.GetLabels()[discoveryv1.LabelServiceName] == serviceName + } +} diff --git a/pkg/reconciler/inmemorychannel/controller/controller_test.go b/pkg/reconciler/inmemorychannel/controller/controller_test.go index 869a25d19f4..b97e43a6a18 100644 --- a/pkg/reconciler/inmemorychannel/controller/controller_test.go +++ b/pkg/reconciler/inmemorychannel/controller/controller_test.go @@ -37,9 +37,9 @@ import ( _ "knative.dev/pkg/client/injection/kube/informers/apps/v1/deployment/fake" _ "knative.dev/pkg/client/injection/kube/informers/core/v1/configmap/fake" - _ "knative.dev/pkg/client/injection/kube/informers/core/v1/endpoints/fake" _ "knative.dev/pkg/client/injection/kube/informers/core/v1/service/fake" _ "knative.dev/pkg/client/injection/kube/informers/core/v1/serviceaccount/fake" + _ "knative.dev/pkg/client/injection/kube/informers/discovery/v1/endpointslice/fake" _ "knative.dev/pkg/client/injection/kube/informers/rbac/v1/rolebinding/fake" secretinformer "knative.dev/pkg/injection/clients/namespacedkube/informers/core/v1/secret/fake" ) diff --git a/pkg/reconciler/inmemorychannel/controller/inmemorychannel.go b/pkg/reconciler/inmemorychannel/controller/inmemorychannel.go index eb29fa4a35d..c3c1ef38167 100644 --- a/pkg/reconciler/inmemorychannel/controller/inmemorychannel.go +++ b/pkg/reconciler/inmemorychannel/controller/inmemorychannel.go @@ -32,11 +32,14 @@ import ( "go.uber.org/zap" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" + discoveryv1 "k8s.io/api/discovery/v1" "k8s.io/apimachinery/pkg/api/equality" apierrs "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" appsv1listers "k8s.io/client-go/listers/apps/v1" corev1listers "k8s.io/client-go/listers/core/v1" + discoveryv1listers "k8s.io/client-go/listers/discovery/v1" rbacv1listers "k8s.io/client-go/listers/rbac/v1" "knative.dev/pkg/apis" duckv1 "knative.dev/pkg/apis/duck/v1" @@ -87,7 +90,7 @@ type Reconciler struct { dispatcherImage string deploymentLister appsv1listers.DeploymentLister serviceLister corev1listers.ServiceLister - endpointsLister corev1listers.EndpointsLister + endpointSliceLister discoveryv1listers.EndpointSliceLister serviceAccountLister corev1listers.ServiceAccountLister secretLister corev1listers.SecretLister roleBindingLister rbacv1listers.RoleBindingLister @@ -139,22 +142,36 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, imc *v1.InMemoryChannel) } imc.Status.MarkServiceTrue() - // Get the Dispatcher Service Endpoints and propagate the status to the Channel - // endpoints has the same name as the service, so not a bug. - e, err := r.endpointsLister.Endpoints(dispatcherNamespace).Get(dispatcherName) + // Get the Dispatcher Service EndpointSlices and propagate the status to the Channel. + epSlices, err := r.endpointSliceLister.EndpointSlices(dispatcherNamespace).List(labels.SelectorFromSet(labels.Set{ + discoveryv1.LabelServiceName: dispatcherName, + })) if err != nil { - if apierrs.IsNotFound(err) { - logging.FromContext(ctx).Error("Endpoints do not exist for dispatcher service") - imc.Status.MarkEndpointsFailed("DispatcherEndpointsDoesNotExist", "Dispatcher Endpoints does not exist") - } else { - logging.FromContext(ctx).Error("Unable to get the dispatcher endpoints", zap.Error(err)) - imc.Status.MarkEndpointsUnknown("DispatcherEndpointsGetFailed", "Failed to get dispatcher endpoints") - } + logging.FromContext(ctx).Error("Unable to get the dispatcher EndpointSlices", zap.Error(err)) + imc.Status.MarkEndpointsUnknown("DispatcherEndpointsGetFailed", "Failed to get dispatcher EndpointSlices") return err } - if len(e.Subsets) == 0 { - logging.FromContext(ctx).Error("No endpoints found for Dispatcher service", zap.Error(err)) + if len(epSlices) == 0 { + logging.FromContext(ctx).Error("No EndpointSlices found for Dispatcher service") + imc.Status.MarkEndpointsFailed("DispatcherEndpointsDoesNotExist", "Dispatcher EndpointSlices do not exist") + return errors.New("dispatcher EndpointSlices do not exist") + } + + hasReady := false + for _, eps := range epSlices { + for _, ep := range eps.Endpoints { + if ep.Conditions.Ready == nil || *ep.Conditions.Ready { + hasReady = true + break + } + } + if hasReady { + break + } + } + if !hasReady { + logging.FromContext(ctx).Error("No ready endpoints found for Dispatcher service") imc.Status.MarkEndpointsFailed("DispatcherEndpointsNotReady", "There are no endpoints ready for Dispatcher service") return errors.New("there are no endpoints ready for Dispatcher service") } diff --git a/pkg/reconciler/inmemorychannel/controller/inmemorychannel_test.go b/pkg/reconciler/inmemorychannel/controller/inmemorychannel_test.go index b38ec80d619..1f4323c7bf6 100644 --- a/pkg/reconciler/inmemorychannel/controller/inmemorychannel_test.go +++ b/pkg/reconciler/inmemorychannel/controller/inmemorychannel_test.go @@ -46,6 +46,7 @@ import ( appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" + discoveryv1 "k8s.io/api/discovery/v1" rbacv1 "k8s.io/api/rbac/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" @@ -179,7 +180,7 @@ func TestAllCases(t *testing.T) { Objects: []runtime.Object{ makeFalseDeployment(), makeService(), - makeReadyEndpoints(), + makeReadyEndpointSlice(), NewInMemoryChannel(imcName, testNS), }, WantErr: false, @@ -203,7 +204,7 @@ func TestAllCases(t *testing.T) { Objects: []runtime.Object{ makeUnknownDeployment(), makeService(), - makeReadyEndpoints(), + makeReadyEndpointSlice(), NewInMemoryChannel(imcName, testNS), }, WantErr: false, @@ -238,7 +239,7 @@ func TestAllCases(t *testing.T) { Eventf(corev1.EventTypeWarning, "DispatcherServiceFailed", `Reconciling dispatcher Service failed: service "imc-dispatcher" not found`), }, }, { - Name: "Endpoints does not exist", + Name: "EndpointSlices do not exist", Key: imcKey, Objects: []runtime.Object{ makeReadyDeployment(), @@ -251,19 +252,19 @@ func TestAllCases(t *testing.T) { WithInitInMemoryChannelConditions, WithInMemoryChannelDeploymentReady(), WithInMemoryChannelServiceReady(), - WithInMemoryChannelEndpointsNotReady("DispatcherEndpointsDoesNotExist", "Dispatcher Endpoints does not exist"), + WithInMemoryChannelEndpointsNotReady("DispatcherEndpointsDoesNotExist", "Dispatcher EndpointSlices do not exist"), ), }}, WantEvents: []string{ - Eventf(corev1.EventTypeWarning, "InternalError", `endpoints "imc-dispatcher" not found`), + Eventf(corev1.EventTypeWarning, "InternalError", `dispatcher EndpointSlices do not exist`), }, }, { - Name: "Endpoints not ready", + Name: "EndpointSlices not ready", Key: imcKey, Objects: []runtime.Object{ makeReadyDeployment(), makeService(), - makeEmptyEndpoints(), + makeNotReadyEndpointSlice(), NewInMemoryChannel(imcName, testNS), }, WantErr: true, @@ -284,7 +285,7 @@ func TestAllCases(t *testing.T) { Objects: []runtime.Object{ makeReadyDeployment(), makeService(), - makeReadyEndpoints(), + makeReadyEndpointSlice(), NewInMemoryChannel(imcName, testNS, WithDeadLetterSink(imcDest), WithInMemoryChannelGeneration(imcGeneration), @@ -317,7 +318,7 @@ func TestAllCases(t *testing.T) { makeDLSServiceAsUnstructured(), makeReadyDeployment(), makeService(), - makeReadyEndpoints(), + makeReadyEndpointSlice(), NewInMemoryChannel(imcName, testNS, WithDeadLetterSink(imcDest), WithInMemoryChannelGeneration(imcGeneration), @@ -349,7 +350,7 @@ func TestAllCases(t *testing.T) { makeDLSServiceAsUnstructured(), makeReadyDeployment(), makeService(), - makeReadyEndpoints(), + makeReadyEndpointSlice(), NewInMemoryChannel(imcName, testNS, WithDeadLetterSink(duckv1.Destination{ Ref: imcDest.Ref, @@ -391,7 +392,7 @@ func TestAllCases(t *testing.T) { Objects: []runtime.Object{ makeReadyDeployment(), makeService(), - makeReadyEndpoints(), + makeReadyEndpointSlice(), makeDLSServiceAsUnstructured(), NewInMemoryChannel(imcName, testNS, WithDeadLetterSink(imcDest)), @@ -417,7 +418,7 @@ func TestAllCases(t *testing.T) { Objects: []runtime.Object{ makeReadyDeployment(), makeService(), - makeReadyEndpoints(), + makeReadyEndpointSlice(), NewInMemoryChannel(imcName, testNS), makeChannelServiceNotOwnedByUs(NewInMemoryChannel(imcName, testNS)), }, @@ -440,7 +441,7 @@ func TestAllCases(t *testing.T) { Objects: []runtime.Object{ makeReadyDeployment(), makeService(), - makeReadyEndpoints(), + makeReadyEndpointSlice(), makeDLSServiceAsUnstructured(), NewInMemoryChannel(imcName, testNS, WithInMemoryChannelSubscribers(subscribers), @@ -468,7 +469,7 @@ func TestAllCases(t *testing.T) { Objects: []runtime.Object{ makeReadyDeployment(), makeService(), - makeReadyEndpoints(), + makeReadyEndpointSlice(), makeDLSServiceAsUnstructured(), NewInMemoryChannel(imcName, testNS, WithInMemoryChannelSubscribers(subscribers), @@ -499,7 +500,7 @@ func TestAllCases(t *testing.T) { Objects: []runtime.Object{ makeReadyDeployment(), makeService(), - makeReadyEndpoints(), + makeReadyEndpointSlice(), NewInMemoryChannel(imcName, testNS), }, WantErr: true, @@ -529,7 +530,7 @@ func TestAllCases(t *testing.T) { makeReadyDeployment(), makeService(), makeTLSSecret(), - makeReadyEndpoints(), + makeReadyEndpointSlice(), NewInMemoryChannel(imcName, testNS, WithDeadLetterSink(imcDest), WithInMemoryChannelGeneration(imcGeneration), @@ -574,7 +575,7 @@ func TestAllCases(t *testing.T) { makeReadyDeployment(), makeService(), makeTLSSecret(), - makeReadyEndpoints(), + makeReadyEndpointSlice(), NewInMemoryChannel(imcName, testNS, WithDeadLetterSink(imcDest), WithInMemoryChannelGeneration(imcGeneration), @@ -620,7 +621,7 @@ func TestAllCases(t *testing.T) { makeDLSServiceAsUnstructured(), makeReadyDeployment(), makeService(), - makeReadyEndpoints(), + makeReadyEndpointSlice(), NewInMemoryChannel(imcName, testNS, WithDeadLetterSink(imcDest), WithInMemoryChannelGeneration(imcGeneration), @@ -657,7 +658,7 @@ func TestAllCases(t *testing.T) { makeDLSServiceAsUnstructured(), makeReadyDeployment(), makeService(), - makeReadyEndpoints(), + makeReadyEndpointSlice(), NewInMemoryChannel(imcName, testNS, WithDeadLetterSink(imcDest), WithInMemoryChannelGeneration(imcGeneration), @@ -692,7 +693,7 @@ func TestAllCases(t *testing.T) { makeDLSServiceAsUnstructured(), makeReadyDeployment(), makeService(), - makeReadyEndpoints(), + makeReadyEndpointSlice(), NewInMemoryChannel(imcName, testNS, WithDeadLetterSink(imcDest), WithInMemoryChannelGeneration(imcGeneration), @@ -726,7 +727,7 @@ func TestAllCases(t *testing.T) { makeDLSServiceAsUnstructured(), makeReadyDeployment(), makeService(), - makeReadyEndpoints(), + makeReadyEndpointSlice(), NewInMemoryChannel(imcName, testNS, WithDeadLetterSink(imcDest), WithInMemoryChannelGeneration(imcGeneration), @@ -774,14 +775,14 @@ func TestAllCases(t *testing.T) { } r := &Reconciler{ - kubeClientSet: fakekubeclient.Get(ctx), - systemNamespace: testNS, - deploymentLister: listers.GetDeploymentLister(), - serviceLister: listers.GetServiceLister(), - endpointsLister: listers.GetEndpointsLister(), - secretLister: listers.GetSecretLister(), - eventPolicyLister: listers.GetEventPolicyLister(), - uriResolver: resolver.NewURIResolverFromTracker(ctx, tracker.New(func(types.NamespacedName) {}, 0)), + kubeClientSet: fakekubeclient.Get(ctx), + systemNamespace: testNS, + deploymentLister: listers.GetDeploymentLister(), + serviceLister: listers.GetServiceLister(), + endpointSliceLister: listers.GetEndpointSliceLister(), + secretLister: listers.GetSecretLister(), + eventPolicyLister: listers.GetEventPolicyLister(), + uriResolver: resolver.NewURIResolverFromTracker(ctx, tracker.New(func(types.NamespacedName) {}, 0)), } return inmemorychannel.NewReconciler(ctx, logger, fakeeventingclient.Get(ctx), listers.GetInMemoryChannelLister(), @@ -813,7 +814,7 @@ func TestInNamespace(t *testing.T) { WithInMemoryScopeAnnotation(eventing.ScopeNamespace), WithDeadLetterSink(imcDest)), makeRoleBinding(systemNS, dispatcherName+"-"+testNS, "eventing-config-reader", NewInMemoryChannel(imcName, testNS)), - makeReadyEndpoints(), + makeReadyEndpointSlice(), }, WantErr: false, WantCreates: []runtime.Object{ @@ -857,7 +858,7 @@ func TestInNamespace(t *testing.T) { makeRoleBinding(systemNS, dispatcherName+"-"+testNS, "eventing-config-reader", NewInMemoryChannel(imcName, "knative-testing")), makeDispatcherDeployment(NewInMemoryChannel(imcName, testNS)), makeDispatcherService(testNS), - makeReadyEndpoints(), + makeReadyEndpointSlice(), }, WantErr: false, WantCreates: []runtime.Object{ @@ -891,7 +892,7 @@ func TestInNamespace(t *testing.T) { systemNamespace: systemNS, deploymentLister: listers.GetDeploymentLister(), serviceLister: listers.GetServiceLister(), - endpointsLister: listers.GetEndpointsLister(), + endpointSliceLister: listers.GetEndpointSliceLister(), serviceAccountLister: listers.GetServiceAccountLister(), roleBindingLister: listers.GetRoleBindingLister(), secretLister: listers.GetSecretLister(), @@ -1037,22 +1038,40 @@ func makeChannelServiceNotOwnedByUs(imc *v1.InMemoryChannel) *corev1.Service { } } -func makeEmptyEndpoints() *corev1.Endpoints { - return &corev1.Endpoints{ +func makeEmptyEndpointSlice() *discoveryv1.EndpointSlice { + return &discoveryv1.EndpointSlice{ TypeMeta: metav1.TypeMeta{ - APIVersion: "v1", - Kind: "Endpoints", + APIVersion: "discovery.k8s.io/v1", + Kind: "EndpointSlice", }, ObjectMeta: metav1.ObjectMeta{ Namespace: testNS, - Name: dispatcherName, + Name: dispatcherName + "-abc12", + Labels: map[string]string{ + discoveryv1.LabelServiceName: dispatcherName, + }, }, + AddressType: discoveryv1.AddressTypeIPv4, } } -func makeReadyEndpoints() *corev1.Endpoints { - e := makeEmptyEndpoints() - e.Subsets = []corev1.EndpointSubset{{Addresses: []corev1.EndpointAddress{{IP: "1.1.1.1"}}}} +func makeNotReadyEndpointSlice() *discoveryv1.EndpointSlice { + ready := false + e := makeEmptyEndpointSlice() + e.Endpoints = []discoveryv1.Endpoint{{ + Addresses: []string{"1.1.1.1"}, + Conditions: discoveryv1.EndpointConditions{Ready: &ready}, + }} + return e +} + +func makeReadyEndpointSlice() *discoveryv1.EndpointSlice { + ready := true + e := makeEmptyEndpointSlice() + e.Endpoints = []discoveryv1.Endpoint{{ + Addresses: []string{"1.1.1.1"}, + Conditions: discoveryv1.EndpointConditions{Ready: &ready}, + }} return e } diff --git a/pkg/reconciler/testing/endpointslice.go b/pkg/reconciler/testing/endpointslice.go new file mode 100644 index 00000000000..30a76f96b62 --- /dev/null +++ b/pkg/reconciler/testing/endpointslice.go @@ -0,0 +1,58 @@ +/* +Copyright 2020 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package testing + +import ( + discoveryv1 "k8s.io/api/discovery/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +// EndpointSliceOption enables further configuration of an EndpointSlice. +type EndpointSliceOption func(*discoveryv1.EndpointSlice) + +// NewEndpointSlice creates an EndpointSlice with EndpointSliceOptions +func NewEndpointSlice(name, namespace string, so ...EndpointSliceOption) *discoveryv1.EndpointSlice { + s := &discoveryv1.EndpointSlice{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: namespace, + }, + AddressType: discoveryv1.AddressTypeIPv4, + } + for _, opt := range so { + opt(s) + } + return s +} + +func WithEndpointSliceLabels(labels map[string]string) EndpointSliceOption { + return func(s *discoveryv1.EndpointSlice) { + s.ObjectMeta.Labels = labels + } +} + +func WithEndpointSliceAddresses(addrs ...discoveryv1.Endpoint) EndpointSliceOption { + return func(s *discoveryv1.EndpointSlice) { + s.Endpoints = addrs + } +} + +func WithEndpointSliceAnnotations(annotations map[string]string) EndpointSliceOption { + return func(s *discoveryv1.EndpointSlice) { + s.ObjectMeta.Annotations = annotations + } +} diff --git a/pkg/reconciler/testing/listers.go b/pkg/reconciler/testing/listers.go index 42b5c4c2364..2812390a6a8 100644 --- a/pkg/reconciler/testing/listers.go +++ b/pkg/reconciler/testing/listers.go @@ -19,6 +19,7 @@ package testing import ( appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" + discoveryv1 "k8s.io/api/discovery/v1" rbacv1 "k8s.io/api/rbac/v1" apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" fakeapiextensionsclientset "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset/fake" @@ -27,6 +28,7 @@ import ( fakekubeclientset "k8s.io/client-go/kubernetes/fake" appsv1listers "k8s.io/client-go/listers/apps/v1" corev1listers "k8s.io/client-go/listers/core/v1" + discoveryv1listers "k8s.io/client-go/listers/discovery/v1" rbacv1listers "k8s.io/client-go/listers/rbac/v1" "k8s.io/client-go/tools/cache" sourcesv1beta2 "knative.dev/eventing/pkg/apis/sources/v1beta2" @@ -138,6 +140,10 @@ func (l *Listers) GetEndpointsLister() corev1listers.EndpointsLister { return corev1listers.NewEndpointsLister(l.indexerFor(&corev1.Endpoints{})) } +func (l *Listers) GetEndpointSliceLister() discoveryv1listers.EndpointSliceLister { + return discoveryv1listers.NewEndpointSliceLister(l.indexerFor(&discoveryv1.EndpointSlice{})) +} + func (l *Listers) GetConfigMapLister() corev1listers.ConfigMapLister { return corev1listers.NewConfigMapLister(l.indexerFor(&corev1.ConfigMap{})) } diff --git a/pkg/reconciler/testing/v1/broker.go b/pkg/reconciler/testing/v1/broker.go index f4f11c848d2..5c6bf8b84dc 100644 --- a/pkg/reconciler/testing/v1/broker.go +++ b/pkg/reconciler/testing/v1/broker.go @@ -156,13 +156,13 @@ func WithTriggerChannelReady() BrokerOption { func WithFilterAvailable() BrokerOption { return func(b *v1.Broker) { - b.Status.PropagateFilterAvailability(v1.TestHelper.AvailableEndpoints()) + b.Status.PropagateFilterAvailability(v1.TestHelper.AvailableEndpointSlices()) } } func WithIngressAvailable() BrokerOption { return func(b *v1.Broker) { - b.Status.PropagateIngressAvailability(v1.TestHelper.AvailableEndpoints()) + b.Status.PropagateIngressAvailability(v1.TestHelper.AvailableEndpointSlices()) } } diff --git a/pkg/reconciler/testing/v1/endpointslice.go b/pkg/reconciler/testing/v1/endpointslice.go new file mode 100644 index 00000000000..30a76f96b62 --- /dev/null +++ b/pkg/reconciler/testing/v1/endpointslice.go @@ -0,0 +1,58 @@ +/* +Copyright 2020 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package testing + +import ( + discoveryv1 "k8s.io/api/discovery/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +// EndpointSliceOption enables further configuration of an EndpointSlice. +type EndpointSliceOption func(*discoveryv1.EndpointSlice) + +// NewEndpointSlice creates an EndpointSlice with EndpointSliceOptions +func NewEndpointSlice(name, namespace string, so ...EndpointSliceOption) *discoveryv1.EndpointSlice { + s := &discoveryv1.EndpointSlice{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: namespace, + }, + AddressType: discoveryv1.AddressTypeIPv4, + } + for _, opt := range so { + opt(s) + } + return s +} + +func WithEndpointSliceLabels(labels map[string]string) EndpointSliceOption { + return func(s *discoveryv1.EndpointSlice) { + s.ObjectMeta.Labels = labels + } +} + +func WithEndpointSliceAddresses(addrs ...discoveryv1.Endpoint) EndpointSliceOption { + return func(s *discoveryv1.EndpointSlice) { + s.Endpoints = addrs + } +} + +func WithEndpointSliceAnnotations(annotations map[string]string) EndpointSliceOption { + return func(s *discoveryv1.EndpointSlice) { + s.ObjectMeta.Annotations = annotations + } +} diff --git a/pkg/reconciler/testing/v1/listers.go b/pkg/reconciler/testing/v1/listers.go index 5b8a2c0c799..8e0536115df 100644 --- a/pkg/reconciler/testing/v1/listers.go +++ b/pkg/reconciler/testing/v1/listers.go @@ -22,6 +22,7 @@ import ( appsv1 "k8s.io/api/apps/v1" batchv1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" + discoveryv1 "k8s.io/api/discovery/v1" rbacv1 "k8s.io/api/rbac/v1" apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" fakeapiextensionsclientset "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset/fake" @@ -31,6 +32,7 @@ import ( appsv1listers "k8s.io/client-go/listers/apps/v1" batchv1listers "k8s.io/client-go/listers/batch/v1" corev1listers "k8s.io/client-go/listers/core/v1" + discoveryv1listers "k8s.io/client-go/listers/discovery/v1" rbacv1listers "k8s.io/client-go/listers/rbac/v1" "k8s.io/client-go/tools/cache" eventingduck "knative.dev/eventing/pkg/apis/duck/v1" @@ -240,6 +242,10 @@ func (l *Listers) GetEndpointsLister() corev1listers.EndpointsLister { return corev1listers.NewEndpointsLister(l.indexerFor(&corev1.Endpoints{})) } +func (l *Listers) GetEndpointSliceLister() discoveryv1listers.EndpointSliceLister { + return discoveryv1listers.NewEndpointSliceLister(l.indexerFor(&discoveryv1.EndpointSlice{})) +} + func (l *Listers) GetConfigMapLister() corev1listers.ConfigMapLister { return corev1listers.NewConfigMapLister(l.indexerFor(&corev1.ConfigMap{})) } diff --git a/pkg/reconciler/testing/v1beta1/listers.go b/pkg/reconciler/testing/v1beta1/listers.go index 3f3fb63a474..3f12609c73a 100644 --- a/pkg/reconciler/testing/v1beta1/listers.go +++ b/pkg/reconciler/testing/v1beta1/listers.go @@ -19,6 +19,7 @@ package testing import ( appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" + discoveryv1 "k8s.io/api/discovery/v1" rbacv1 "k8s.io/api/rbac/v1" apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" fakeapiextensionsclientset "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset/fake" @@ -27,6 +28,7 @@ import ( fakekubeclientset "k8s.io/client-go/kubernetes/fake" appsv1listers "k8s.io/client-go/listers/apps/v1" corev1listers "k8s.io/client-go/listers/core/v1" + discoveryv1listers "k8s.io/client-go/listers/discovery/v1" rbacv1listers "k8s.io/client-go/listers/rbac/v1" "k8s.io/client-go/tools/cache" eventingv1beta1 "knative.dev/eventing/pkg/apis/eventing/v1beta1" @@ -136,6 +138,10 @@ func (l *Listers) GetEndpointsLister() corev1listers.EndpointsLister { return corev1listers.NewEndpointsLister(l.indexerFor(&corev1.Endpoints{})) } +func (l *Listers) GetEndpointSliceLister() discoveryv1listers.EndpointSliceLister { + return discoveryv1listers.NewEndpointSliceLister(l.indexerFor(&discoveryv1.EndpointSlice{})) +} + func (l *Listers) GetConfigMapLister() corev1listers.ConfigMapLister { return corev1listers.NewConfigMapLister(l.indexerFor(&corev1.ConfigMap{})) } diff --git a/pkg/reconciler/testing/v1beta2/listers.go b/pkg/reconciler/testing/v1beta2/listers.go index b18192c1df6..d3704c553cb 100644 --- a/pkg/reconciler/testing/v1beta2/listers.go +++ b/pkg/reconciler/testing/v1beta2/listers.go @@ -19,6 +19,7 @@ package testing import ( appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" + discoveryv1 "k8s.io/api/discovery/v1" rbacv1 "k8s.io/api/rbac/v1" apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" fakeapiextensionsclientset "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset/fake" @@ -27,6 +28,7 @@ import ( fakekubeclientset "k8s.io/client-go/kubernetes/fake" appsv1listers "k8s.io/client-go/listers/apps/v1" corev1listers "k8s.io/client-go/listers/core/v1" + discoveryv1listers "k8s.io/client-go/listers/discovery/v1" rbacv1listers "k8s.io/client-go/listers/rbac/v1" "k8s.io/client-go/tools/cache" eventingv1beta2 "knative.dev/eventing/pkg/apis/eventing/v1beta2" @@ -136,6 +138,10 @@ func (l *Listers) GetEndpointsLister() corev1listers.EndpointsLister { return corev1listers.NewEndpointsLister(l.indexerFor(&corev1.Endpoints{})) } +func (l *Listers) GetEndpointSliceLister() discoveryv1listers.EndpointSliceLister { + return discoveryv1listers.NewEndpointSliceLister(l.indexerFor(&discoveryv1.EndpointSlice{})) +} + func (l *Listers) GetConfigMapLister() corev1listers.ConfigMapLister { return corev1listers.NewConfigMapLister(l.indexerFor(&corev1.ConfigMap{})) } diff --git a/pkg/reconciler/testing/v1beta3/listers.go b/pkg/reconciler/testing/v1beta3/listers.go index 0c451f2db78..efb7ec69b06 100644 --- a/pkg/reconciler/testing/v1beta3/listers.go +++ b/pkg/reconciler/testing/v1beta3/listers.go @@ -19,6 +19,7 @@ package testing import ( appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" + discoveryv1 "k8s.io/api/discovery/v1" rbacv1 "k8s.io/api/rbac/v1" apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" fakeapiextensionsclientset "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset/fake" @@ -27,6 +28,7 @@ import ( fakekubeclientset "k8s.io/client-go/kubernetes/fake" appsv1listers "k8s.io/client-go/listers/apps/v1" corev1listers "k8s.io/client-go/listers/core/v1" + discoveryv1listers "k8s.io/client-go/listers/discovery/v1" rbacv1listers "k8s.io/client-go/listers/rbac/v1" "k8s.io/client-go/tools/cache" eventingv1beta3 "knative.dev/eventing/pkg/apis/eventing/v1beta3" @@ -136,6 +138,10 @@ func (l *Listers) GetEndpointsLister() corev1listers.EndpointsLister { return corev1listers.NewEndpointsLister(l.indexerFor(&corev1.Endpoints{})) } +func (l *Listers) GetEndpointSliceLister() discoveryv1listers.EndpointSliceLister { + return discoveryv1listers.NewEndpointSliceLister(l.indexerFor(&discoveryv1.EndpointSlice{})) +} + func (l *Listers) GetConfigMapLister() corev1listers.ConfigMapLister { return corev1listers.NewConfigMapLister(l.indexerFor(&corev1.ConfigMap{})) } diff --git a/vendor/knative.dev/pkg/client/injection/kube/informers/discovery/v1/endpointslice/endpointslice.go b/vendor/knative.dev/pkg/client/injection/kube/informers/discovery/v1/endpointslice/endpointslice.go new file mode 100644 index 00000000000..443ea622b12 --- /dev/null +++ b/vendor/knative.dev/pkg/client/injection/kube/informers/discovery/v1/endpointslice/endpointslice.go @@ -0,0 +1,52 @@ +/* +Copyright 2022 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Code generated by injection-gen. DO NOT EDIT. + +package endpointslice + +import ( + context "context" + + v1 "k8s.io/client-go/informers/discovery/v1" + factory "knative.dev/pkg/client/injection/kube/informers/factory" + controller "knative.dev/pkg/controller" + injection "knative.dev/pkg/injection" + logging "knative.dev/pkg/logging" +) + +func init() { + injection.Default.RegisterInformer(withInformer) +} + +// Key is used for associating the Informer inside the context.Context. +type Key struct{} + +func withInformer(ctx context.Context) (context.Context, controller.Informer) { + f := factory.Get(ctx) + inf := f.Discovery().V1().EndpointSlices() + return context.WithValue(ctx, Key{}, inf), inf.Informer() +} + +// Get extracts the typed informer from the context. +func Get(ctx context.Context) v1.EndpointSliceInformer { + untyped := ctx.Value(Key{}) + if untyped == nil { + logging.FromContext(ctx).Panic( + "Unable to fetch k8s.io/client-go/informers/discovery/v1.EndpointSliceInformer from context.") + } + return untyped.(v1.EndpointSliceInformer) +} diff --git a/vendor/knative.dev/pkg/client/injection/kube/informers/discovery/v1/endpointslice/fake/fake.go b/vendor/knative.dev/pkg/client/injection/kube/informers/discovery/v1/endpointslice/fake/fake.go new file mode 100644 index 00000000000..b7b693a02a3 --- /dev/null +++ b/vendor/knative.dev/pkg/client/injection/kube/informers/discovery/v1/endpointslice/fake/fake.go @@ -0,0 +1,40 @@ +/* +Copyright 2022 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Code generated by injection-gen. DO NOT EDIT. + +package fake + +import ( + context "context" + + endpointslice "knative.dev/pkg/client/injection/kube/informers/discovery/v1/endpointslice" + fake "knative.dev/pkg/client/injection/kube/informers/factory/fake" + controller "knative.dev/pkg/controller" + injection "knative.dev/pkg/injection" +) + +var Get = endpointslice.Get + +func init() { + injection.Fake.RegisterInformer(withInformer) +} + +func withInformer(ctx context.Context) (context.Context, controller.Informer) { + f := fake.Get(ctx) + inf := f.Discovery().V1().EndpointSlices() + return context.WithValue(ctx, endpointslice.Key{}, inf), inf.Informer() +} diff --git a/vendor/modules.txt b/vendor/modules.txt index 62bf303b1c7..94a43ff6e08 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -1439,6 +1439,8 @@ knative.dev/pkg/client/injection/kube/informers/core/v1/serviceaccount knative.dev/pkg/client/injection/kube/informers/core/v1/serviceaccount/fake knative.dev/pkg/client/injection/kube/informers/core/v1/serviceaccount/filtered knative.dev/pkg/client/injection/kube/informers/core/v1/serviceaccount/filtered/fake +knative.dev/pkg/client/injection/kube/informers/discovery/v1/endpointslice +knative.dev/pkg/client/injection/kube/informers/discovery/v1/endpointslice/fake knative.dev/pkg/client/injection/kube/informers/factory knative.dev/pkg/client/injection/kube/informers/factory/fake knative.dev/pkg/client/injection/kube/informers/factory/filtered