Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,9 @@ rules:
- update
- patch
- apiGroups:
- ""
- "discovery.k8s.io"
resources:
- endpoints
- endpointslices
verbs:
- get
- list
Expand Down
10 changes: 9 additions & 1 deletion config/core/roles/controller-clusterroles.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ rules:
- "secrets"
- "configmaps"
- "services"
- "endpoints"
- "events"
- "serviceaccounts"
- "pods"
Expand All @@ -41,6 +40,15 @@ rules:
- "patch"
- "watch"

- apiGroups:
- "discovery.k8s.io"
resources:
- "endpointslices"
verbs:
- "get"
- "list"
- "watch"

# Brokers and the namespace annotation controllers manipulate Deployments.
# RequestReply controller needs to manipulate StatefulSets
- apiGroups:
Expand Down
15 changes: 9 additions & 6 deletions pkg/apis/duck/lifecycle_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ package duck

import (
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
discoveryv1 "k8s.io/api/discovery/v1"
)

// DeploymentIsAvailable determines if the provided deployment is available. Note that if it cannot
Expand All @@ -33,11 +33,14 @@ func DeploymentIsAvailable(d *appsv1.DeploymentStatus, def bool) bool {
return def
}

// EndpointsAreAvailable determines if the provided Endpoints are available.
func EndpointsAreAvailable(ep *corev1.Endpoints) bool {
for _, subset := range ep.Subsets {
if len(subset.Addresses) > 0 {
return true
// EndpointSlicesAreAvailable determines if the provided EndpointSlices have any ready endpoints.
func EndpointSlicesAreAvailable(epSlices []*discoveryv1.EndpointSlice) bool {
for _, eps := range epSlices {
for _, ep := range eps.Endpoints {
// Per the K8s API spec, a nil Ready value should be interpreted as ready.
if ep.Conditions.Ready == nil || *ep.Conditions.Ready {
return true
}
}
}
return false
Expand Down
14 changes: 7 additions & 7 deletions pkg/apis/eventing/v1/broker_lifecycle_mt.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ limitations under the License.
package v1

import (
corev1 "k8s.io/api/core/v1"
discoveryv1 "k8s.io/api/discovery/v1"

"knative.dev/eventing/pkg/apis/duck"
duckv1 "knative.dev/eventing/pkg/apis/duck/v1"
Expand All @@ -27,11 +27,11 @@ func (bs *BrokerStatus) MarkIngressFailed(reason, format string, args ...interfa
bs.GetConditionSet().Manage(bs).MarkFalse(BrokerConditionIngress, reason, format, args...)
}

func (bs *BrokerStatus) PropagateIngressAvailability(ep *corev1.Endpoints) {
if duck.EndpointsAreAvailable(ep) {
func (bs *BrokerStatus) PropagateIngressAvailability(epSlices []*discoveryv1.EndpointSlice) {
if duck.EndpointSlicesAreAvailable(epSlices) {
bs.GetConditionSet().Manage(bs).MarkTrue(BrokerConditionIngress)
} else {
bs.MarkIngressFailed("EndpointsUnavailable", "Endpoints %q are unavailable.", ep.Name)
bs.MarkIngressFailed("EndpointSlicesUnavailable", "EndpointSlices are unavailable.")
}
}

Expand All @@ -57,10 +57,10 @@ func (bs *BrokerStatus) MarkFilterFailed(reason, format string, args ...interfac
bs.GetConditionSet().Manage(bs).MarkFalse(BrokerConditionFilter, reason, format, args...)
}

func (bs *BrokerStatus) PropagateFilterAvailability(ep *corev1.Endpoints) {
if duck.EndpointsAreAvailable(ep) {
func (bs *BrokerStatus) PropagateFilterAvailability(epSlices []*discoveryv1.EndpointSlice) {
if duck.EndpointSlicesAreAvailable(epSlices) {
bs.GetConditionSet().Manage(bs).MarkTrue(BrokerConditionFilter)
} else {
bs.MarkFilterFailed("EndpointsUnavailable", "Endpoints %q are unavailable.", ep.Name)
bs.MarkFilterFailed("EndpointSlicesUnavailable", "EndpointSlices are unavailable.")
}
}
17 changes: 9 additions & 8 deletions pkg/apis/eventing/v1/broker_lifecycle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (

"github.com/google/go-cmp/cmp"
corev1 "k8s.io/api/core/v1"
discoveryv1 "k8s.io/api/discovery/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/utils/pointer"

Expand Down Expand Up @@ -499,13 +500,13 @@ func TestBrokerIsReady(t *testing.T) {
t.Run(test.name, func(t *testing.T) {
bs := BrokerStatus{}
if test.markIngressReady != nil {
var ep *corev1.Endpoints
var epSlices []*discoveryv1.EndpointSlice
if *test.markIngressReady {
ep = TestHelper.AvailableEndpoints()
epSlices = TestHelper.AvailableEndpointSlices()
} else {
ep = TestHelper.UnavailableEndpoints()
epSlices = TestHelper.UnavailableEndpointSlices()
}
bs.PropagateIngressAvailability(ep)
bs.PropagateIngressAvailability(epSlices)
}
if test.markTriggerChannelReady != nil {
var c *eventingduckv1.ChannelableStatus
Expand Down Expand Up @@ -534,13 +535,13 @@ func TestBrokerIsReady(t *testing.T) {
}

if test.markFilterReady != nil {
var ep *corev1.Endpoints
var epSlices []*discoveryv1.EndpointSlice
if *test.markFilterReady {
ep = TestHelper.AvailableEndpoints()
epSlices = TestHelper.AvailableEndpointSlices()
} else {
ep = TestHelper.UnavailableEndpoints()
epSlices = TestHelper.UnavailableEndpointSlices()
}
bs.PropagateFilterAvailability(ep)
bs.PropagateFilterAvailability(epSlices)
}

if test.markAddressable == nil && test.address == nil {
Expand Down
35 changes: 17 additions & 18 deletions pkg/apis/eventing/v1/test_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package v1

import (
corev1 "k8s.io/api/core/v1"
discoveryv1 "k8s.io/api/discovery/v1"

"knative.dev/pkg/apis"
duckv1 "knative.dev/pkg/apis/duck/v1"
Expand Down Expand Up @@ -59,9 +60,9 @@ func (testHelper) ReadySubscriptionStatus() *messagingv1.SubscriptionStatus {

func (t testHelper) ReadyBrokerStatus() *BrokerStatus {
bs := &BrokerStatus{}
bs.PropagateIngressAvailability(t.AvailableEndpoints())
bs.PropagateIngressAvailability(t.AvailableEndpointSlices())
bs.PropagateTriggerChannelReadiness(t.ReadyChannelStatus())
bs.PropagateFilterAvailability(t.AvailableEndpoints())
bs.PropagateFilterAvailability(t.AvailableEndpointSlices())
bs.SetAddress(&duckv1.Addressable{
URL: apis.HTTP("example.com"),
})
Expand All @@ -72,9 +73,9 @@ func (t testHelper) ReadyBrokerStatus() *BrokerStatus {

func (t testHelper) ReadyBrokerStatusWithoutDLS() *BrokerStatus {
bs := &BrokerStatus{}
bs.PropagateIngressAvailability(t.AvailableEndpoints())
bs.PropagateIngressAvailability(t.AvailableEndpointSlices())
bs.PropagateTriggerChannelReadiness(t.ReadyChannelStatus())
bs.PropagateFilterAvailability(t.AvailableEndpoints())
bs.PropagateFilterAvailability(t.AvailableEndpointSlices())
bs.SetAddress(&duckv1.Addressable{
URL: apis.HTTP("example.com"),
})
Expand Down Expand Up @@ -102,26 +103,24 @@ func (testHelper) FalseBrokerStatus() *BrokerStatus {
return bs
}

func (testHelper) UnavailableEndpoints() *corev1.Endpoints {
ep := &corev1.Endpoints{}
ep.Name = "unavailable"
ep.Subsets = []corev1.EndpointSubset{{
NotReadyAddresses: []corev1.EndpointAddress{{
IP: "127.0.0.1",
func (testHelper) UnavailableEndpointSlices() []*discoveryv1.EndpointSlice {
ready := false
return []*discoveryv1.EndpointSlice{{
Endpoints: []discoveryv1.Endpoint{{
Addresses: []string{"127.0.0.1"},
Conditions: discoveryv1.EndpointConditions{Ready: &ready},
}},
}}
return ep
}

func (testHelper) AvailableEndpoints() *corev1.Endpoints {
ep := &corev1.Endpoints{}
ep.Name = "available"
ep.Subsets = []corev1.EndpointSubset{{
Addresses: []corev1.EndpointAddress{{
IP: "127.0.0.1",
func (testHelper) AvailableEndpointSlices() []*discoveryv1.EndpointSlice {
ready := true
return []*discoveryv1.EndpointSlice{{
Endpoints: []discoveryv1.Endpoint{{
Addresses: []string{"127.0.0.1"},
Conditions: discoveryv1.EndpointConditions{Ready: &ready},
}},
}}
return ep
}

func (testHelper) ReadyChannelStatus() *eventingduckv1.ChannelableStatus {
Expand Down
26 changes: 16 additions & 10 deletions pkg/reconciler/broker/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (

"go.uber.org/zap"
corev1 "k8s.io/api/core/v1"
discoveryv1 "k8s.io/api/discovery/v1"
"k8s.io/apimachinery/pkg/api/equality"
apierrs "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
Expand All @@ -33,6 +34,7 @@ import (
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/dynamic"
corev1listers "k8s.io/client-go/listers/core/v1"
discoveryv1listers "k8s.io/client-go/listers/discovery/v1"
"k8s.io/utils/pointer"
"knative.dev/pkg/kmeta"
"knative.dev/pkg/resolver"
Expand Down Expand Up @@ -72,10 +74,10 @@ type Reconciler struct {
dynamicClientSet dynamic.Interface

// listers index properties about resources
endpointsLister corev1listers.EndpointsLister
subscriptionLister messaginglisters.SubscriptionLister
configmapLister corev1listers.ConfigMapLister
secretLister corev1listers.SecretLister
endpointSliceLister discoveryv1listers.EndpointSliceLister
subscriptionLister messaginglisters.SubscriptionLister
configmapLister corev1listers.ConfigMapLister
secretLister corev1listers.SecretLister

channelableTracker ducklib.ListableTracker

Expand Down Expand Up @@ -188,21 +190,25 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, b *eventingv1.Broker) pk

b.Status.PropagateTriggerChannelReadiness(channelStatus)

filterEndpoints, err := r.endpointsLister.Endpoints(system.Namespace()).Get(names.BrokerFilterName)
filterEpSlices, err := r.endpointSliceLister.EndpointSlices(system.Namespace()).List(labels.SelectorFromSet(labels.Set{
discoveryv1.LabelServiceName: names.BrokerFilterName,
}))
if err != nil {
logging.FromContext(ctx).Errorw("Problem getting endpoints for filter", zap.String("namespace", system.Namespace()), zap.Error(err))
logging.FromContext(ctx).Errorw("Problem getting EndpointSlices for filter", zap.String("namespace", system.Namespace()), zap.Error(err))
b.Status.MarkFilterFailed("ServiceFailure", "%v", err)
return err
}
b.Status.PropagateFilterAvailability(filterEndpoints)
b.Status.PropagateFilterAvailability(filterEpSlices)

ingressEndpoints, err := r.endpointsLister.Endpoints(system.Namespace()).Get(names.BrokerIngressName)
ingressEpSlices, err := r.endpointSliceLister.EndpointSlices(system.Namespace()).List(labels.SelectorFromSet(labels.Set{
discoveryv1.LabelServiceName: names.BrokerIngressName,
}))
if err != nil {
logging.FromContext(ctx).Errorw("Problem getting endpoints for ingress", zap.String("namespace", system.Namespace()), zap.Error(err))
logging.FromContext(ctx).Errorw("Problem getting EndpointSlices for ingress", zap.String("namespace", system.Namespace()), zap.Error(err))
b.Status.MarkIngressFailed("ServiceFailure", "%v", err)
return err
}
b.Status.PropagateIngressAvailability(ingressEndpoints)
b.Status.PropagateIngressAvailability(ingressEpSlices)

if b.Spec.Delivery != nil && b.Spec.Delivery.DeadLetterSink != nil {
deadLetterSinkAddr, err := r.uriResolver.AddressableFromDestinationV1(ctx, *b.Spec.Delivery.DeadLetterSink, b)
Expand Down
Loading
Loading