Skip to content
Merged
12 changes: 12 additions & 0 deletions config/core/resources/eventtransform.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,18 @@ spec:
type:
description: Type of condition.
type: string
policies:
description: List of applied EventPolicies
type: array
items:
type: object
properties:
apiVersion:
description: The API version of the applied EventPolicy. This indicates, which version of EventPolicy is supported by the resource.
type: string
name:
description: The name of the applied EventPolicy
type: string
jsonata:
description: JsonataTransformationStatus is the status associated with JsonataEventTransformationSpec.
type: object
Expand Down
19 changes: 18 additions & 1 deletion docs/eventing-api.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ Resource Types:
<h3 id="duck.knative.dev/v1.AppliedEventPoliciesStatus">AppliedEventPoliciesStatus
</h3>
<p>
(<em>Appears on:</em><a href="#duck.knative.dev/v1.ChannelableStatus">ChannelableStatus</a>, <a href="#eventing.knative.dev/v1.BrokerStatus">BrokerStatus</a>, <a href="#eventing.knative.dev/v1alpha1.RequestReplyStatus">RequestReplyStatus</a>, <a href="#flows.knative.dev/v1.ParallelStatus">ParallelStatus</a>, <a href="#flows.knative.dev/v1.SequenceStatus">SequenceStatus</a>, <a href="#sinks.knative.dev/v1alpha1.IntegrationSinkStatus">IntegrationSinkStatus</a>, <a href="#sinks.knative.dev/v1alpha1.JobSinkStatus">JobSinkStatus</a>)
(<em>Appears on:</em><a href="#duck.knative.dev/v1.ChannelableStatus">ChannelableStatus</a>, <a href="#eventing.knative.dev/v1.BrokerStatus">BrokerStatus</a>, <a href="#eventing.knative.dev/v1alpha1.EventTransformStatus">EventTransformStatus</a>, <a href="#eventing.knative.dev/v1alpha1.RequestReplyStatus">RequestReplyStatus</a>, <a href="#flows.knative.dev/v1.ParallelStatus">ParallelStatus</a>, <a href="#flows.knative.dev/v1.SequenceStatus">SequenceStatus</a>, <a href="#sinks.knative.dev/v1alpha1.IntegrationSinkStatus">IntegrationSinkStatus</a>, <a href="#sinks.knative.dev/v1alpha1.JobSinkStatus">JobSinkStatus</a>)
</p>
<p>
<p>AppliedEventPoliciesStatus contains the list of policies which apply to a resource.
Expand Down Expand Up @@ -3515,6 +3515,23 @@ It exposes the endpoint as an URI to get events delivered.</p>
</tr>
<tr>
<td>
<code>AppliedEventPoliciesStatus</code><br/>
<em>
<a href="#duck.knative.dev/v1.AppliedEventPoliciesStatus">
AppliedEventPoliciesStatus
</a>
</em>
</td>
<td>
<p>
(Members of <code>AppliedEventPoliciesStatus</code> are embedded into this type.)
</p>
<em>(Optional)</em>
<p>AppliedEventPoliciesStatus contains the list of EventPolicies which apply to this EventTransform.</p>
</td>
</tr>
<tr>
<td>
<code>jsonata</code><br/>
<em>
<a href="#eventing.knative.dev/v1alpha1.JsonataEventTransformationStatus">
Expand Down
22 changes: 20 additions & 2 deletions pkg/apis/eventing/v1alpha1/eventtransform_lifecycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,9 @@ import (
)

const (
TransformConditionAddressable apis.ConditionType = "Addressable"
TransformationConditionReady apis.ConditionType = "TransformationReady"
TransformConditionAddressable apis.ConditionType = "Addressable"
TransformationConditionReady apis.ConditionType = "TransformationReady"
TransformationEventPoliciesReady apis.ConditionType = "EventPoliciesReady"

TransformationAddressableEmptyURL string = "NoURL"
TransformationAddressableWaitingForServiceEndpoints string = "WaitingForServiceEndpoints"
Expand All @@ -48,6 +49,7 @@ const (
var TransformCondSet = apis.NewLivingConditionSet(
TransformationConditionReady,
TransformConditionAddressable,
TransformationEventPoliciesReady,
)

// transformJsonataConditionSet is the subset of conditions for the Jsonata transformation
Expand Down Expand Up @@ -211,3 +213,19 @@ func (ts *EventTransformStatus) SetAddresses(addresses ...duckv1.Addressable) {
}
ts.GetConditionSet().Manage(ts).MarkTrue(TransformConditionAddressable)
}

func (ts *EventTransformStatus) MarkEventPoliciesFailed(reason, messageFormat string, messageA ...interface{}) {
TransformCondSet.Manage(ts).MarkFalse(TransformationEventPoliciesReady, reason, messageFormat, messageA...)
}

func (ts *EventTransformStatus) MarkEventPoliciesUnknown(reason, messageFormat string, messageA ...interface{}) {
TransformCondSet.Manage(ts).MarkUnknown(TransformationEventPoliciesReady, reason, messageFormat, messageA...)
}

func (ts *EventTransformStatus) MarkEventPoliciesTrue() {
TransformCondSet.Manage(ts).MarkTrue(TransformationEventPoliciesReady)
}

func (ts *EventTransformStatus) MarkEventPoliciesTrueWithReason(reason, messageFormat string, messageA ...interface{}) {
TransformCondSet.Manage(ts).MarkTrueWithReason(TransformationEventPoliciesReady, reason, messageFormat, messageA...)
}
15 changes: 10 additions & 5 deletions pkg/apis/eventing/v1alpha1/eventtransform_lifecycle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,13 @@ func TestFullLifecycle(t *testing.T) {
topLevel := et.Status.GetCondition(apis.ConditionReady)
transformation := et.Status.GetCondition(TransformationConditionReady)
addressable := et.Status.GetCondition(TransformConditionAddressable)
eventPolicies := et.Status.GetCondition(TransformationEventPoliciesReady)

assert.Equal(t, corev1.ConditionUnknown, topLevel.Status)
assert.Equal(t, corev1.ConditionUnknown, transformation.Status)
assert.Equal(t, corev1.ConditionUnknown, addressable.Status)
assert.Len(t, et.Status.Conditions, 3)
assert.Equal(t, corev1.ConditionUnknown, eventPolicies.Status)
assert.Len(t, et.Status.Conditions, 4)
assert.Equal(t, false, et.Status.IsReady())

ds := appsv1.DeploymentStatus{
Expand All @@ -75,26 +77,29 @@ func TestFullLifecycle(t *testing.T) {

deploymentCondition := et.Status.GetCondition(TransformationJsonataDeploymentReady)
assert.Equal(t, corev1.ConditionTrue, deploymentCondition.Status)
assert.Len(t, et.Status.Conditions, 4)
assert.Len(t, et.Status.Conditions, 5)
assert.Equal(t, false, et.Status.IsReady())

transformationCondition := et.Status.GetCondition(TransformationConditionReady)
assert.Equal(t, corev1.ConditionUnknown, transformationCondition.Status, et)
assert.Len(t, et.Status.Conditions, 4)
assert.Len(t, et.Status.Conditions, 5)
assert.Equal(t, false, et.Status.IsReady())

et.Status.PropagateJsonataSinkBindingUnset()

transformationCondition = et.Status.GetCondition(TransformationConditionReady)
assert.Equal(t, corev1.ConditionTrue, transformationCondition.Status, et)
assert.Len(t, et.Status.Conditions, 5)
assert.Len(t, et.Status.Conditions, 6)
assert.Equal(t, false, et.Status.IsReady())

et.Status.SetAddresses(duckv1.Addressable{URL: apis.HTTPS("example.com")})
addrCondition := et.Status.GetCondition(TransformConditionAddressable)
assert.Equal(t, corev1.ConditionTrue, addrCondition.Status, et)
assert.Len(t, et.Status.Conditions, 5)
assert.Len(t, et.Status.Conditions, 6)
assert.Equal(t, false, et.Status.IsReady())

et.Status.MarkEventPoliciesTrue()
assert.Len(t, et.Status.Conditions, 6)
assert.Equal(t, true, et.Status.IsReady())

// All conditions are ready
Expand Down
5 changes: 5 additions & 0 deletions pkg/apis/eventing/v1alpha1/eventtransform_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"

eventingduckv1 "knative.dev/eventing/pkg/apis/duck/v1"
"knative.dev/pkg/apis"
duckv1 "knative.dev/pkg/apis/duck/v1"
"knative.dev/pkg/kmeta"
Expand Down Expand Up @@ -130,6 +131,10 @@ type EventTransformStatus struct {
// +optional
duckv1.AddressStatus `json:",inline"`

// AppliedEventPoliciesStatus contains the list of EventPolicies which apply to this EventTransform.
// +optional
eventingduckv1.AppliedEventPoliciesStatus `json:",inline"`

// JsonataTransformationStatus is the status associated with JsonataEventTransformationSpec.
// +optional
JsonataTransformationStatus *JsonataEventTransformationStatus `json:"jsonata,omitempty"`
Expand Down
1 change: 1 addition & 0 deletions pkg/apis/eventing/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

27 changes: 27 additions & 0 deletions pkg/reconciler/eventtransform/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,25 +20,30 @@ import (
"context"

cmclient "github.com/cert-manager/cert-manager/pkg/client/clientset/versioned"
"github.com/kelseyhightower/envconfig"
"go.uber.org/zap"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/informers"
"knative.dev/eventing/pkg/auth"
"knative.dev/eventing/pkg/certificates"
"knative.dev/eventing/pkg/eventingtls"
kubeclient "knative.dev/pkg/client/injection/kube/client"
deploymentinformer "knative.dev/pkg/client/injection/kube/informers/apps/v1/deployment/filtered"
configmapinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/configmap/filtered"
secretinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/secret/filtered"
serviceinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/service/filtered"
rolebindinginformer "knative.dev/pkg/client/injection/kube/informers/rbac/v1/rolebinding"
"knative.dev/pkg/configmap"
"knative.dev/pkg/controller"
"knative.dev/pkg/injection"
"knative.dev/pkg/kmeta"
"knative.dev/pkg/logging"

"knative.dev/eventing/pkg/apis/eventing/v1alpha1"
"knative.dev/eventing/pkg/apis/feature"
eventingclient "knative.dev/eventing/pkg/client/injection/client"
eventpolicyinformer "knative.dev/eventing/pkg/client/injection/informers/eventing/v1alpha1/eventpolicy"
eventtransforminformer "knative.dev/eventing/pkg/client/injection/informers/eventing/v1alpha1/eventtransform"
sinkbindinginformer "knative.dev/eventing/pkg/client/injection/informers/sources/v1/sinkbinding/filtered"
"knative.dev/eventing/pkg/client/injection/reconciler/eventing/v1alpha1/eventtransform"
Expand All @@ -49,6 +54,10 @@ const (
NameLabelKey = "eventing.knative.dev/event-transform-name"
)

type envConfig struct {
AuthProxyImage string `envconfig:"AUTH_PROXY_IMAGE" required:"true"`
}

func NewController(
ctx context.Context,
cmw configmap.Watcher,
Expand All @@ -62,6 +71,8 @@ func NewController(
certificatesSecretInformer := secretinformer.Get(ctx, certificates.SecretLabelSelectorPair)
trustBundleConfigMapInformer := configmapinformer.Get(ctx, eventingtls.TrustBundleLabelSelector)
dynamicCertificatesInformer := certificates.NewDynamicCertificatesInformer()
eventPolicyInformer := eventpolicyinformer.Get(ctx)
rolebindingInformer := rolebindinginformer.Get(ctx)

// Create a custom informer as one in knative/pkg doesn't exist for endpoints.
jsonataEndpointFactory := informers.NewSharedInformerFactoryWithOptions(
Expand All @@ -73,6 +84,11 @@ func NewController(
)
jsonataEndpointInformer := jsonataEndpointFactory.Core().V1().Endpoints()

env := &envConfig{}
if err := envconfig.Process("", env); err != nil {
logging.FromContext(ctx).Panicf("unable to process EventTransform's required environment variables: %v", err)
}

var globalResync func()
var enqueueControllerOf func(interface{})

Expand Down Expand Up @@ -104,6 +120,10 @@ func NewController(
cmCertificateLister: dynamicCertificatesInformer.Lister(),
certificatesSecretLister: certificatesSecretInformer.Lister(),
trustBundleConfigMapLister: trustBundleConfigMapInformer.Lister(),
eventPolicyLister: eventPolicyInformer.Lister(),
rolebindingLister: rolebindingInformer.Lister(),
eventTransformLister: eventTransformInformer.Lister(),
authProxyImage: env.AuthProxyImage,
configWatcher: configWatcher,
}

Expand All @@ -126,6 +146,13 @@ func NewController(
jsonataConfigMapInformer.Informer().AddEventHandler(controller.HandleAll(enqueueUsingNameLabel(impl)))
jsonataSinkBindingInformer.Informer().AddEventHandler(controller.HandleAll(enqueueUsingNameLabel(impl)))

eventTransformGK := v1alpha1.SchemeGroupVersion.WithKind("EventTransform").GroupKind()
eventPolicyInformer.Informer().AddEventHandler(auth.EventPolicyEventHandler(
eventTransformInformer.Informer().GetIndexer(),
eventTransformGK,
impl.EnqueueKey,
))

// Start the factory after creating all necessary informers.
jsonataEndpointFactory.Start(ctx.Done())
jsonataEndpointFactory.WaitForCacheSync(ctx.Done())
Expand Down
Loading
Loading