-
Notifications
You must be signed in to change notification settings - Fork 18
Watch and sync changes to related resources #149
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -39,9 +39,12 @@ import ( | |
| metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" | ||
| "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" | ||
| "k8s.io/apimachinery/pkg/labels" | ||
| "k8s.io/apimachinery/pkg/runtime/schema" | ||
| "k8s.io/apimachinery/pkg/types" | ||
| "k8s.io/apimachinery/pkg/util/sets" | ||
| "k8s.io/utils/ptr" | ||
| ctrlruntimeclient "sigs.k8s.io/controller-runtime/pkg/client" | ||
| "sigs.k8s.io/controller-runtime/pkg/cluster" | ||
| "sigs.k8s.io/controller-runtime/pkg/handler" | ||
| "sigs.k8s.io/controller-runtime/pkg/manager" | ||
| "sigs.k8s.io/controller-runtime/pkg/predicate" | ||
|
|
@@ -161,6 +164,73 @@ func Create( | |
| return nil, fmt.Errorf("failed to setup local-side watch: %w", err) | ||
| } | ||
|
|
||
| // Watch origin:kcp related resources so that changes to them trigger reconciliation | ||
| // of the owning primary object. Only related resources with a Watch config are covered. | ||
| watchedGVKs := sets.New[schema.GroupVersionKind]() | ||
| for _, relRes := range pubRes.Spec.Related { | ||
| if relRes.Origin != syncagentv1alpha1.RelatedResourceOriginKcp || relRes.Watch == nil { | ||
| continue | ||
| } | ||
|
|
||
| gvr := schema.GroupVersionResource{ | ||
| Group: relRes.Group, | ||
| Version: relRes.Version, | ||
| Resource: relRes.Resource, | ||
| } | ||
|
|
||
| // Use the local REST mapper to determine the Kind. | ||
| gvk, err := localManager.GetRESTMapper().KindFor(gvr) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If the related resource originates in kcp (as per the first if statement in this loop), then why are we using the local (= service cluster) REST mapper to resolve it? Related resources support projection (i.e. changing their GVK when syncing from one side to another), so a GVK that exists in kcp is not necessarily the same as it is on the service cluster. Ideally this should use a restmapper of the origin side (whereever that might be). |
||
| if err != nil { | ||
| log.Warnw("Failed to determine Kind for origin:kcp related resource, skipping watch", "gvr", gvr, "error", err) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I fear this log message will never be seen by anyone and during runtime, this state will also not fix itself (the agent won't try to re-establish this watch at a later time, unless you restart the entire agent). To catch misconfigurations, wouldn't it make more sense to error out here? |
||
| continue | ||
| } | ||
|
|
||
| // Deduplicate: only set up one watch per GVK. | ||
| if watchedGVKs.Has(gvk) { | ||
| continue | ||
| } | ||
| watchedGVKs.Insert(gvk) | ||
|
|
||
| relatedDummy := &unstructured.Unstructured{} | ||
| relatedDummy.SetGroupVersionKind(gvk) | ||
|
|
||
| var enqueueForRelated mchandler.TypedEventHandlerFunc[*unstructured.Unstructured, mcreconcile.Request] | ||
|
|
||
| switch { | ||
| case relRes.Watch.ByOwner != nil: | ||
| ownerKind := relRes.Watch.ByOwner.Kind | ||
| enqueueForRelated = func(clusterName string, _ cluster.Cluster) handler.TypedEventHandler[*unstructured.Unstructured, mcreconcile.Request] { | ||
| return &byOwnerEventHandler{ | ||
| clusterName: clusterName, | ||
| ownerKind: ownerKind, | ||
| } | ||
| } | ||
|
|
||
| case relRes.Watch.ByLabel != nil: | ||
| labelTemplates := relRes.Watch.ByLabel | ||
| primaryDummy := remoteDummy.DeepCopy() | ||
| enqueueForRelated = func(clusterName string, cl cluster.Cluster) handler.TypedEventHandler[*unstructured.Unstructured, mcreconcile.Request] { | ||
| return &byLabelEventHandler{ | ||
| clusterName: clusterName, | ||
| client: cl.GetClient(), | ||
| primaryDummy: primaryDummy, | ||
| labelTemplates: labelTemplates, | ||
| log: log, | ||
| } | ||
| } | ||
|
|
||
| default: | ||
| log.Warnw("origin:kcp related resource has Watch set but neither byOwner nor byLabel configured, skipping", "gvk", gvk) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This should definitely be an error. Misconfigured PublishedResources should IMHO scream loudly. If we have the XValidation rule such errors will be much less likely, but I would still error out if the PublishedResource is broken. |
||
| continue | ||
| } | ||
|
|
||
| if err := c.MultiClusterWatch(mcsource.TypedKind(relatedDummy, enqueueForRelated)); err != nil { | ||
| return nil, fmt.Errorf("failed to setup watch for origin:kcp related resource %v: %w", gvk, err) | ||
| } | ||
|
|
||
| log.Infow("Set up watch for origin:kcp related resource", "gvk", gvk) | ||
| } | ||
|
|
||
| log.Info("Done setting up unmanaged controller.") | ||
|
|
||
| return c, nil | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,144 @@ | ||
| /* | ||
| Copyright 2026 The KCP Authors. | ||
|
|
||
| Licensed under the Apache License, Version 2.0 (the "License"); | ||
| you may not use this file except in compliance with the License. | ||
| You may obtain a copy of the License at | ||
|
|
||
| http://www.apache.org/licenses/LICENSE-2.0 | ||
|
|
||
| Unless required by applicable law or agreed to in writing, software | ||
| distributed under the License is distributed on an "AS IS" BASIS, | ||
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| See the License for the specific language governing permissions and | ||
| limitations under the License. | ||
| */ | ||
|
|
||
| package sync | ||
|
|
||
| import ( | ||
| "context" | ||
|
|
||
| "go.uber.org/zap" | ||
|
|
||
| "github.com/kcp-dev/api-syncagent/internal/sync/templating" | ||
|
|
||
| "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" | ||
| "k8s.io/apimachinery/pkg/types" | ||
| "k8s.io/client-go/util/workqueue" | ||
| ctrlruntimeclient "sigs.k8s.io/controller-runtime/pkg/client" | ||
| "sigs.k8s.io/controller-runtime/pkg/event" | ||
| "sigs.k8s.io/controller-runtime/pkg/reconcile" | ||
| mcreconcile "sigs.k8s.io/multicluster-runtime/pkg/reconcile" | ||
| ) | ||
|
|
||
| // byOwnerEventHandler enqueues the primary object by inspecting the OwnerReferences | ||
| // of the changed related object and finding one with the configured Kind. | ||
| type byOwnerEventHandler struct { | ||
| clusterName string | ||
| ownerKind string | ||
| } | ||
|
|
||
| func (h *byOwnerEventHandler) Create(_ context.Context, evt event.TypedCreateEvent[*unstructured.Unstructured], q workqueue.TypedRateLimitingInterface[mcreconcile.Request]) { | ||
| h.enqueue(evt.Object, q) | ||
| } | ||
|
|
||
| func (h *byOwnerEventHandler) Update(_ context.Context, evt event.TypedUpdateEvent[*unstructured.Unstructured], q workqueue.TypedRateLimitingInterface[mcreconcile.Request]) { | ||
| h.enqueue(evt.ObjectNew, q) | ||
| } | ||
|
|
||
| func (h *byOwnerEventHandler) Delete(_ context.Context, evt event.TypedDeleteEvent[*unstructured.Unstructured], q workqueue.TypedRateLimitingInterface[mcreconcile.Request]) { | ||
| h.enqueue(evt.Object, q) | ||
| } | ||
|
|
||
| func (h *byOwnerEventHandler) Generic(_ context.Context, evt event.TypedGenericEvent[*unstructured.Unstructured], q workqueue.TypedRateLimitingInterface[mcreconcile.Request]) { | ||
| h.enqueue(evt.Object, q) | ||
| } | ||
|
|
||
| func (h *byOwnerEventHandler) enqueue(obj *unstructured.Unstructured, q workqueue.TypedRateLimitingInterface[mcreconcile.Request]) { | ||
| for _, ref := range obj.GetOwnerReferences() { | ||
| if ref.Kind == h.ownerKind { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This should compare G, V and K, I think. |
||
| q.Add(mcreconcile.Request{ | ||
| ClusterName: h.clusterName, | ||
| Request: reconcile.Request{ | ||
| NamespacedName: types.NamespacedName{ | ||
| Namespace: obj.GetNamespace(), | ||
| Name: ref.Name, | ||
| }, | ||
| }, | ||
| }) | ||
| return | ||
| } | ||
| } | ||
| } | ||
|
|
||
| // byLabelEventHandler enqueues primary objects by evaluating label templates against | ||
| // the changed related object and listing primaries matching the resulting label selector. | ||
| type byLabelEventHandler struct { | ||
| clusterName string | ||
| client ctrlruntimeclient.Client | ||
| primaryDummy *unstructured.Unstructured | ||
| labelTemplates map[string]string | ||
| log *zap.SugaredLogger | ||
| } | ||
|
|
||
| func (h *byLabelEventHandler) Create(ctx context.Context, evt event.TypedCreateEvent[*unstructured.Unstructured], q workqueue.TypedRateLimitingInterface[mcreconcile.Request]) { | ||
| h.enqueue(ctx, evt.Object, q) | ||
| } | ||
|
|
||
| func (h *byLabelEventHandler) Update(ctx context.Context, evt event.TypedUpdateEvent[*unstructured.Unstructured], q workqueue.TypedRateLimitingInterface[mcreconcile.Request]) { | ||
| h.enqueue(ctx, evt.ObjectNew, q) | ||
| } | ||
|
|
||
| func (h *byLabelEventHandler) Delete(ctx context.Context, evt event.TypedDeleteEvent[*unstructured.Unstructured], q workqueue.TypedRateLimitingInterface[mcreconcile.Request]) { | ||
| h.enqueue(ctx, evt.Object, q) | ||
| } | ||
|
|
||
| func (h *byLabelEventHandler) Generic(ctx context.Context, evt event.TypedGenericEvent[*unstructured.Unstructured], q workqueue.TypedRateLimitingInterface[mcreconcile.Request]) { | ||
| h.enqueue(ctx, evt.Object, q) | ||
| } | ||
|
|
||
| func (h *byLabelEventHandler) enqueue(ctx context.Context, obj *unstructured.Unstructured, q workqueue.TypedRateLimitingInterface[mcreconcile.Request]) { | ||
| // Build the template context using the changed related object. | ||
| data := map[string]any{ | ||
| "watchObject": map[string]any{ | ||
| "name": obj.GetName(), | ||
| "namespace": obj.GetNamespace(), | ||
| "labels": obj.GetLabels(), | ||
| }, | ||
| } | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please create an explicit struct for this templating context in https://github.com/kcp-dev/api-syncagent/blob/main/internal/sync/templating/related.go, which helps in documenting the available template variables. Please also add a corresponding New...() func, to ensure that the newly introduced struct always gets built correctly. You can then extend https://github.com/kcp-dev/api-syncagent/blob/main/docs/content/publish-resources/templating.md, which is trivial once the struct is setup. |
||
|
|
||
| // Evaluate each label template to build the selector. | ||
| matchingLabels := ctrlruntimeclient.MatchingLabels{} | ||
| for key, tpl := range h.labelTemplates { | ||
| value, err := templating.Render(tpl, data) | ||
| if err != nil { | ||
| h.log.Warnw("Failed to evaluate byLabel template", "key", key, "template", tpl, "error", err) | ||
| return | ||
| } | ||
| matchingLabels[key] = value | ||
| } | ||
|
|
||
| // List primary objects matching the derived label selector. | ||
| primaryList := &unstructured.UnstructuredList{} | ||
| primaryList.SetAPIVersion(h.primaryDummy.GetAPIVersion()) | ||
| primaryList.SetKind(h.primaryDummy.GetKind() + "List") | ||
|
|
||
| if err := h.client.List(ctx, primaryList, matchingLabels); err != nil { | ||
| h.log.Warnw("Failed to list primary objects for byLabel watch", "selector", matchingLabels, "error", err) | ||
| return | ||
| } | ||
|
|
||
| for i := range primaryList.Items { | ||
| primary := &primaryList.Items[i] | ||
| q.Add(mcreconcile.Request{ | ||
| ClusterName: h.clusterName, | ||
| Request: reconcile.Request{ | ||
| NamespacedName: types.NamespacedName{ | ||
| Namespace: primary.GetNamespace(), | ||
| Name: primary.GetName(), | ||
| }, | ||
| }, | ||
| }) | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -256,6 +256,36 @@ type RelatedResourceSpec struct { | |
| // Mutation configures optional transformation rules for the related resource. | ||
| // Status mutations are only performed when the related resource originates in kcp. | ||
| Mutation *ResourceMutationSpec `json:"mutation,omitempty"` | ||
|
|
||
| // Watch configures how the agent identifies the owning primary object when a related | ||
| // resource with origin: kcp changes. When set, the agent sets up a watch on the related | ||
| // resource type and uses the configured rule to enqueue the correct primary object. | ||
| // Without this field, changes to origin:kcp related resources do not trigger reconciliation. | ||
| Watch *RelatedResourceWatch `json:"watch,omitempty"` | ||
| } | ||
|
|
||
| // RelatedResourceWatch configures how the watch handler maps a changed related resource | ||
| // back to its owning primary object. | ||
| // Exactly one of ByOwner or ByLabel must be set. | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please add an XValidation rule to ensure this. There are some examples in this file already. |
||
| type RelatedResourceWatch struct { | ||
| // ByOwner configures the watch handler to inspect the OwnerReferences of the changed | ||
| // object. When an OwnerReference with the given Kind is found, the referenced owner | ||
| // is enqueued as the primary object. | ||
| // +optional | ||
| ByOwner *RelatedResourceWatchByOwner `json:"byOwner,omitempty"` | ||
|
|
||
| // ByLabel configures the watch handler to list primary objects matching a label selector | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we rename this to "ByLabels" or "BySelector"? "ByLabel" is singular and I think this feature supports a selector with multiple key-value pairs, right? |
||
| // derived from the changed object. Each map key is a label key on the primary object; | ||
| // each value is a Go template expression evaluated with the changed object available as | ||
| // .watchObject (with fields .name, .namespace, .labels). | ||
| // +optional | ||
| ByLabel map[string]string `json:"byLabel,omitempty"` | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we support a full-blown metav1.LabelSelector here? |
||
| } | ||
|
|
||
| // RelatedResourceWatchByOwner configures reverse lookup via OwnerReferences. | ||
| type RelatedResourceWatchByOwner struct { | ||
| // Kind is the Kind to look for in the OwnerReferences of the changed related object. | ||
| Kind string `json:"kind"` | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we should take a full GVK here and not just compare based on Kinds.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Actually, I think the opposite now. This is not required at all, is it? We already know the GVK of the primary object on both sides (origin and destination). We also already know the GVK/GVR of every related resource (like ConfigMaps). So what the agent needs to do is to watch all the GVR of related resources on the origin side (ConfigMaps, for example) and check if they have an ownerRef to the GVK of the primary object (a Cluster object, or whatever). And if so, cool, enqueue the primary object (i.e. the owner). We don't need any further configuration for the ByOwner functionality here, I think. |
||
| } | ||
|
|
||
| // RelatedResourceProjection describes how the source GVK of a related resource (i.e. | ||
|
|
||
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can't quite remember why we would only want to do this on the kcp side.. Couldn't we technically also have related objects on the service cluster side? It feels like this watching behaviour should work regardless of the origin side, no? 🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I.e. it should always watch on the origin side, regardless where that side actually is.