diff --git a/.gitignore b/.gitignore index 29d31af..ef68c8c 100644 --- a/.gitignore +++ b/.gitignore @@ -33,4 +33,10 @@ notes/ docs/public docs/resources/_gen/ docs/.hugo_build.lock -test/integration/clab-* \ No newline at end of file +test/integration/clab-* + +# Only for development and testing purposes +# To be removed after development of targetsource +# ignored in order to not add unnecassary logging messages +lab/dev/resources/targetsources +.scannerwork/ \ No newline at end of file diff --git a/api/v1alpha1/targetsource_types.go b/api/v1alpha1/targetsource_types.go index feea000..aa5b8ce 100644 --- a/api/v1alpha1/targetsource_types.go +++ b/api/v1alpha1/targetsource_types.go @@ -17,34 +17,191 @@ limitations under the License. package v1alpha1 import ( + corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) // TargetSourceSpec defines the desired state of TargetSource // +kubebuilder:validation:Required type TargetSourceSpec struct { + // Provider defines the source of targets for this TargetSource + // Only one provider can be specified per TargetSource + // +kubebuilder:validation:Required Provider *ProviderSpec `json:"provider"` - // + + // TODO: implement in message processor + // Optional port to use for discovered targets if not specified by the provider + // +kubebuilder:validation:Optional + TargetPort int32 `json:"targetPort,omitempty"` + + // Optional labels to apply to all targets discovered by this TargetSource + // +kubebuilder:validation:Optional TargetLabels map[string]string `json:"targetLabels,omitempty"` + // The TargetProfile to use for targets discovered by this TargetSource + // +kubebuilder:validation:Required // +kubebuilder:validation:MinLength=1 TargetProfile string `json:"targetProfile"` } -// +kubebuilder:validation:ExactlyOneOf=http;consul +// ProviderSpec defines the source of targets for a TargetSource +// Only one provider can be specified per TargetSource +// +kubebuilder:validation:ExactlyOneOf=http type ProviderSpec struct { - HTTP *HTTPConfig `json:"http,omitempty"` - Consul *ConsulConfig `json:"consul,omitempty"` + // HTTP defines the configuration for a HTTP provider + HTTP *HTTPConfig `json:"http,omitempty"` } +// HTTPConfig defines the configuration for the HTTP provider +// +kubebuilder:validation:AtLeastOneOf=url;acceptPush type HTTPConfig struct { - // +kubebuilder:validation:MinLength=1 - URL string `json:"url"` + // URL of the HTTP endpoint to pull targets from + // If defined, the loader will periodically poll this endpoint for targets + // +kubebuilder:validation:Optional + URL string `json:"url,omitempty"` + + // If true, the loader will accept pushed target updates to the controller endpoint + // The endpoint will be /{namespace}/{targetsource}/ + // +kubebuilder:default=false + // +kubebuilder:validation:Optional + AcceptPush bool `json:"acceptPush,omitempty"` + + // Optional authorization configuration for accessing the HTTP endpoint + // +kubebuilder:validation:Optional + Authorization *AuthorizationSpec `json:"authorization,omitempty"` + + // Optional interval for polling the HTTP endpoint for targets + // TODO: increase default value + // +kubebuilder:default="30s" + // +kubebuilder:validation:Optional + PollInterval *metav1.Duration `json:"interval,omitempty"` + + // Optional timeout for HTTP requests to the endpoint + // +kubebuilder:default="10s" + // +kubebuilder:validation:Optional + Timeout *metav1.Duration `json:"timeout,omitempty"` + + // Optional TLS configuration for connecting to the HTTP endpoint + // +kubebuilder:validation:Optional + TLS *ClientTLSConfig `json:"tls,omitempty"` + + // Optional pagination configuration for parsing responses from the HTTP endpoint + // +kubebuilder:validation:Optional + Pagination *PaginationSpec `json:"pagination,omitempty"` + + // Optional mapping configuration for parsing responses from the HTTP endpoint + // +kubebuilder:validation:Optional + ResponseMapping *ResponseMappingSpec `json:"mapping,omitempty"` +} + +// +kubebuilder:validation:XValidation:rule="!(has(self.caBundle) && has(self.caBundleSecretRef))",message="caBundle and caBundleSecretRef are mutually exclusive" +type ClientTLSConfig struct { + // Skip TLS verification of the Provider's certificate. + // +kubebuilder:default:=false + InsecureSkipVerify bool `json:"insecureSkipVerify,omitempty"` + + // Base64-encoded bundle of PEM CAs which will be used to validate the certificate + // chain presented by the Provider. Only used if using HTTPS to connect to Provider and + // ignored for HTTP connections. + // Mutually exclusive with CABundleSecretRef. + // +optional + CABundle []byte `json:"caBundle,omitempty"` + + // Reference to a Secret containing a bundle of PEM-encoded CAs to use when + // verifying the certificate chain presented by the Provider when using HTTPS. + // Mutually exclusive with CABundle. + CABundleSecretRef *corev1.SecretKeySelector `json:"caBundleSecretRef,omitempty"` +} + +// AuthorizationSpec defines the configuration for authentication +// +kubebuilder:validation:ExactlyOneOf=basic;token +type AuthorizationSpec struct { + // Basic authentication configuration + Basic *BasicAuthSpec `json:"basic,omitempty"` + // Token-based authentication configuration + Token *TokenAuthSpec `json:"token,omitempty"` + // JWT *JWTAuthSpec `json:"jwt,omitempty"` + // MTLS +} + +// BasicAuthSpec defines the configuration for basic authentication +// Enforce EITHER inline creds OR secret ref +// +kubebuilder:validation:XValidation:rule="(has(self.credentialsSecretRef) && !has(self.username) && !has(self.password)) || (!has(self.credentialsSecretRef) && has(self.username) && has(self.password))",message="either credentialsSecretRef OR both username and password must be set, but not a mix" +type BasicAuthSpec struct { + // Username for basic auth + // Mutually exclusive with CredentialsSecretRef. + Username string `json:"username,omitempty"` + // Password for basic auth + // Mutually exclusive with CredentialsSecretRef. + Password string `json:"password,omitempty"` + + // Reference to a Secret containing "username" and "password" keys to use for + // basic authentication when connecting to the Provider. + // Mutually exclusive with Username and Password. + CredentialsSecretRef *corev1.SecretKeySelector `json:"credentialsSecretRef,omitempty"` } -type ConsulConfig struct { +// TokenAuthSpec defines the configuration for token-based authentication +// +kubebuilder:validation:XValidation:rule="has(self.token) != has(self.tokenSecretRef)",message="either token or tokenSecretRef must be set, but not both" +type TokenAuthSpec struct { + // Scheme for the token, e.g. "Bearer" // +kubebuilder:validation:MinLength=1 - URL string `json:"url,omitempty"` + Scheme string `json:"scheme"` + // Token value for authentication + // Mutually exclusive with TokenSecretRef. + Token string `json:"token,omitempty"` + // Reference to a Secret containing a key with the token value to use for + // authentication when connecting to the Provider. + // Mutually exclusive with Token. + TokenSecretRef *corev1.SecretKeySelector `json:"tokenSecretRef,omitempty"` +} + +// +kubebuilder(disabled):validation:XValidation:rule="!((has(self.token) || has(self.tokenSecretRef)) && (has(self.key) || has(self.signingKeySecretRef) || has(self.claims)))",message="static JWT token and generated JWT configuration cannot be combined" +// +kubebuilder(disabled):validation:XValidation:rule="!has(self.signingKeySecretRef) || self.algorithm != \"\"",message="algorithm must be specified when generating a JWT" +// type JWTAuthSpec struct { +// // Static pre-generated JWT +// Token string `json:"token,omitempty"` +// TokenSecretRef *corev1.SecretKeySelector `json:"tokenSecretRef,omitempty"` +// // Optional: generate JWT dynamically +// Claims map[string]string `json:"claims,omitempty"` +// Key string `json:"key,omitempty"` +// SigningKeySecretRef *corev1.SecretKeySelector `json:"signingKeySecretRef,omitempty"` +// // HS256, RS256, ES256, etc. +// Algorithm string `json:"algorithm,omitempty"` +// TTL *metav1.Duration `json:"ttl,omitempty"` +// } + +// PaginationSpec defines the configuration for paginating through responses from providers +type PaginationSpec struct { + // JSONPath-style expression to extract the list of targets from the response + // Example: "results" + ItemsField string `json:"itemsField,omitempty"` + + // JSONPath-style expression to extract the next page token or URL from the response for pagination + // Example: "next" + NextField string `json:"nextField,omitempty"` +} + +// JSONPath-style expressions to extract target fields from the response +// and map them to the corresponding Target fields. +type ResponseMappingSpec struct { + // JSONPath expression to extract the target name from the response + // +kubebuilder:validation:Required + Name string `json:"name"` + + // JSONPath expression to extract the target address from the response + // +kubebuilder:validation:Required + Address string `json:"address"` + + // JSONPath expression to extract the target port from the response + // +kubebuilder:validation:Optional + Port string `json:"port,omitempty"` + + // JSONPath expression to extract the target labels from the response + // The extracted labels will be merged with the static TargetLabels defined in the TargetSourceSpec, + // with values from the response taking precedence in case of conflicts. + // +kubebuilder:validation:Optional + Labels map[string]string `json:"labels,omitempty"` } // TargetSourceStatus defines the observed state of TargetSource diff --git a/api/v1alpha1/zz_generated.deepcopy.go b/api/v1alpha1/zz_generated.deepcopy.go index 61e81fd..dc4b784 100644 --- a/api/v1alpha1/zz_generated.deepcopy.go +++ b/api/v1alpha1/zz_generated.deepcopy.go @@ -46,6 +46,76 @@ func (in *APIConfig) DeepCopy() *APIConfig { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *AuthorizationSpec) DeepCopyInto(out *AuthorizationSpec) { + *out = *in + if in.Basic != nil { + in, out := &in.Basic, &out.Basic + *out = new(BasicAuthSpec) + (*in).DeepCopyInto(*out) + } + if in.Token != nil { + in, out := &in.Token, &out.Token + *out = new(TokenAuthSpec) + (*in).DeepCopyInto(*out) + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new AuthorizationSpec. +func (in *AuthorizationSpec) DeepCopy() *AuthorizationSpec { + if in == nil { + return nil + } + out := new(AuthorizationSpec) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *BasicAuthSpec) DeepCopyInto(out *BasicAuthSpec) { + *out = *in + if in.CredentialsSecretRef != nil { + in, out := &in.CredentialsSecretRef, &out.CredentialsSecretRef + *out = new(v1.SecretKeySelector) + (*in).DeepCopyInto(*out) + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new BasicAuthSpec. +func (in *BasicAuthSpec) DeepCopy() *BasicAuthSpec { + if in == nil { + return nil + } + out := new(BasicAuthSpec) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *ClientTLSConfig) DeepCopyInto(out *ClientTLSConfig) { + *out = *in + if in.CABundle != nil { + in, out := &in.CABundle, &out.CABundle + *out = make([]byte, len(*in)) + copy(*out, *in) + } + if in.CABundleSecretRef != nil { + in, out := &in.CABundleSecretRef, &out.CABundleSecretRef + *out = new(v1.SecretKeySelector) + (*in).DeepCopyInto(*out) + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ClientTLSConfig. +func (in *ClientTLSConfig) DeepCopy() *ClientTLSConfig { + if in == nil { + return nil + } + out := new(ClientTLSConfig) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *Cluster) DeepCopyInto(out *Cluster) { *out = *in @@ -213,21 +283,6 @@ func (in *ClusterTargetState) DeepCopy() *ClusterTargetState { return out } -// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. -func (in *ConsulConfig) DeepCopyInto(out *ConsulConfig) { - *out = *in -} - -// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ConsulConfig. -func (in *ConsulConfig) DeepCopy() *ConsulConfig { - if in == nil { - return nil - } - out := new(ConsulConfig) - in.DeepCopyInto(out) - return out -} - // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *GRPCKeepAliveConfig) DeepCopyInto(out *GRPCKeepAliveConfig) { *out = *in @@ -273,6 +328,36 @@ func (in *GRPCTunnelConfig) DeepCopy() *GRPCTunnelConfig { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *HTTPConfig) DeepCopyInto(out *HTTPConfig) { *out = *in + if in.Authorization != nil { + in, out := &in.Authorization, &out.Authorization + *out = new(AuthorizationSpec) + (*in).DeepCopyInto(*out) + } + if in.PollInterval != nil { + in, out := &in.PollInterval, &out.PollInterval + *out = new(metav1.Duration) + **out = **in + } + if in.Timeout != nil { + in, out := &in.Timeout, &out.Timeout + *out = new(metav1.Duration) + **out = **in + } + if in.TLS != nil { + in, out := &in.TLS, &out.TLS + *out = new(ClientTLSConfig) + (*in).DeepCopyInto(*out) + } + if in.Pagination != nil { + in, out := &in.Pagination, &out.Pagination + *out = new(PaginationSpec) + **out = **in + } + if in.ResponseMapping != nil { + in, out := &in.ResponseMapping, &out.ResponseMapping + *out = new(ResponseMappingSpec) + (*in).DeepCopyInto(*out) + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new HTTPConfig. @@ -587,6 +672,21 @@ func (in *OutputStatus) DeepCopy() *OutputStatus { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *PaginationSpec) DeepCopyInto(out *PaginationSpec) { + *out = *in +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new PaginationSpec. +func (in *PaginationSpec) DeepCopy() *PaginationSpec { + if in == nil { + return nil + } + out := new(PaginationSpec) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *Pipeline) DeepCopyInto(out *Pipeline) { *out = *in @@ -824,12 +924,7 @@ func (in *ProviderSpec) DeepCopyInto(out *ProviderSpec) { if in.HTTP != nil { in, out := &in.HTTP, &out.HTTP *out = new(HTTPConfig) - **out = **in - } - if in.Consul != nil { - in, out := &in.Consul, &out.Consul - *out = new(ConsulConfig) - **out = **in + (*in).DeepCopyInto(*out) } } @@ -843,6 +938,28 @@ func (in *ProviderSpec) DeepCopy() *ProviderSpec { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *ResponseMappingSpec) DeepCopyInto(out *ResponseMappingSpec) { + *out = *in + if in.Labels != nil { + in, out := &in.Labels, &out.Labels + *out = make(map[string]string, len(*in)) + for key, val := range *in { + (*out)[key] = val + } + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ResponseMappingSpec. +func (in *ResponseMappingSpec) DeepCopy() *ResponseMappingSpec { + if in == nil { + return nil + } + out := new(ResponseMappingSpec) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *ServiceConfig) DeepCopyInto(out *ServiceConfig) { *out = *in @@ -1384,6 +1501,26 @@ func (in *TargetTLSConfig) DeepCopy() *TargetTLSConfig { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *TokenAuthSpec) DeepCopyInto(out *TokenAuthSpec) { + *out = *in + if in.TokenSecretRef != nil { + in, out := &in.TokenSecretRef, &out.TokenSecretRef + *out = new(v1.SecretKeySelector) + (*in).DeepCopyInto(*out) + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new TokenAuthSpec. +func (in *TokenAuthSpec) DeepCopy() *TokenAuthSpec { + if in == nil { + return nil + } + out := new(TokenAuthSpec) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *TunnelTargetPolicy) DeepCopyInto(out *TunnelTargetPolicy) { *out = *in diff --git a/cmd/main.go b/cmd/main.go index 4c37a0d..3bb04f7 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -28,6 +28,7 @@ import ( certmanagerv1 "github.com/cert-manager/cert-manager/pkg/apis/certmanager/v1" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" utilruntime "k8s.io/apimachinery/pkg/util/runtime" clientgoscheme "k8s.io/client-go/kubernetes/scheme" ctrl "sigs.k8s.io/controller-runtime" @@ -40,6 +41,8 @@ import ( operatorv1alpha1 "github.com/gnmic/operator/api/v1alpha1" "github.com/gnmic/operator/internal/apiserver" "github.com/gnmic/operator/internal/controller" + "github.com/gnmic/operator/internal/controller/discovery" + "github.com/gnmic/operator/internal/controller/discovery/core" webhookv1alpha1 "github.com/gnmic/operator/internal/webhook/v1alpha1" //+kubebuilder:scaffold:imports ) @@ -64,6 +67,8 @@ func main() { var probeAddr string var devMode bool var apiAddr string + var discoveryChunkSize int + var discoveryBufferSize int flag.StringVar(&apiAddr, "api-bind-address", "", "The address the operator API endpoint binds to. Disabled if empty.") flag.BoolVar(&devMode, "dev-mode", false, "Enable development mode.") flag.StringVar(&metricsAddr, "metrics-bind-address", ":8080", "The address the metric endpoint binds to.") @@ -71,6 +76,8 @@ func main() { flag.BoolVar(&enableLeaderElection, "leader-elect", false, "Enable leader election for controller manager. "+ "Enabling this will ensure there is only one active controller manager.") + flag.IntVar(&discoveryChunkSize, "discovery-chunk-size", 100, "Maximum number of targets/events sent in a single discovery message.") + flag.IntVar(&discoveryBufferSize, "discovery-buffer-size", 10, "Amount of discovery messages that can be queued in the channel buffer.") opts := zap.Options{ Development: devMode, } @@ -79,6 +86,8 @@ func main() { ctrl.SetLogger(zap.New(zap.UseFlagOptions(&opts))) + discoveryRegistry := discovery.NewRegistry[types.NamespacedName, core.DiscoveryRegistryValue]() + mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{ Scheme: scheme, Metrics: metricsserver.Options{BindAddress: metricsAddr}, @@ -117,8 +126,11 @@ func main() { os.Exit(1) } if err := (&controller.TargetSourceReconciler{ - Client: mgr.GetClient(), - Scheme: mgr.GetScheme(), + Client: mgr.GetClient(), + Scheme: mgr.GetScheme(), + BufferSize: discoveryBufferSize, + ChunkSize: discoveryChunkSize, + DiscoveryRegistry: discoveryRegistry, }).SetupWithManager(mgr); err != nil { setupLog.Error(err, "unable to create controller", "controller", "TargetSource") os.Exit(1) @@ -220,6 +232,7 @@ func main() { if apiAddr != "" { apiServer := apiserver.New(apiAddr, clusterReconciler) + apiServer.DiscoveryRegistry = discoveryRegistry err = mgr.Add(manager.RunnableFunc(func(ctx context.Context) error { errCh := make(chan error) go func() { diff --git a/config/crd/bases/operator.gnmic.dev_targetsources.yaml b/config/crd/bases/operator.gnmic.dev_targetsources.yaml index f373822..6851ad7 100644 --- a/config/crd/bases/operator.gnmic.dev_targetsources.yaml +++ b/config/crd/bases/operator.gnmic.dev_targetsources.yaml @@ -40,31 +40,250 @@ spec: description: TargetSourceSpec defines the desired state of TargetSource properties: provider: + description: |- + Provider defines the source of targets for this TargetSource + Only one provider can be specified per TargetSource properties: - consul: - properties: - url: - minLength: 1 - type: string - type: object http: + description: HTTP defines the configuration for a HTTP provider properties: + acceptPush: + default: false + description: |- + If true, the loader will accept pushed target updates to the controller endpoint + The endpoint will be /{namespace}/{targetsource}/ + type: boolean + authorization: + description: Optional authorization configuration for accessing + the HTTP endpoint + properties: + basic: + description: Basic authentication configuration + properties: + credentialsSecretRef: + description: |- + Reference to a Secret containing "username" and "password" keys to use for + basic authentication when connecting to the Provider. + Mutually exclusive with Username and Password. + properties: + key: + description: The key of the secret to select from. Must + be a valid secret key. + type: string + name: + default: "" + description: |- + Name of the referent. + This field is effectively required, but due to backwards compatibility is + allowed to be empty. Instances of this type with an empty value here are + almost certainly wrong. + More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/names/#names + type: string + optional: + description: Specify whether the Secret or its + key must be defined + type: boolean + required: + - key + type: object + x-kubernetes-map-type: atomic + password: + description: |- + Password for basic auth + Mutually exclusive with CredentialsSecretRef. + type: string + username: + description: |- + Username for basic auth + Mutually exclusive with CredentialsSecretRef. + type: string + type: object + x-kubernetes-validations: + - message: either credentialsSecretRef OR both username + and password must be set, but not a mix + rule: (has(self.credentialsSecretRef) && !has(self.username) + && !has(self.password)) || (!has(self.credentialsSecretRef) + && has(self.username) && has(self.password)) + token: + description: Token-based authentication configuration + properties: + scheme: + description: Scheme for the token, e.g. "Bearer" + minLength: 1 + type: string + token: + description: |- + Token value for authentication + Mutually exclusive with TokenSecretRef. + type: string + tokenSecretRef: + description: |- + Reference to a Secret containing a key with the token value to use for + authentication when connecting to the Provider. + Mutually exclusive with Token. + properties: + key: + description: The key of the secret to select from. Must + be a valid secret key. + type: string + name: + default: "" + description: |- + Name of the referent. + This field is effectively required, but due to backwards compatibility is + allowed to be empty. Instances of this type with an empty value here are + almost certainly wrong. + More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/names/#names + type: string + optional: + description: Specify whether the Secret or its + key must be defined + type: boolean + required: + - key + type: object + x-kubernetes-map-type: atomic + required: + - scheme + type: object + x-kubernetes-validations: + - message: either token or tokenSecretRef must be set, + but not both + rule: has(self.token) != has(self.tokenSecretRef) + type: object + x-kubernetes-validations: + - message: exactly one of the fields in [basic token] must + be set + rule: '[has(self.basic),has(self.token)].filter(x,x==true).size() + == 1' + interval: + default: 30s + description: Optional interval for polling the HTTP endpoint + for targets + type: string + mapping: + description: Optional mapping configuration for parsing responses + from the HTTP endpoint + properties: + address: + description: JSONPath expression to extract the target + address from the response + type: string + labels: + additionalProperties: + type: string + description: |- + JSONPath expression to extract the target labels from the response + The extracted labels will be merged with the static TargetLabels defined in the TargetSourceSpec, + with values from the response taking precedence in case of conflicts. + type: object + name: + description: JSONPath expression to extract the target + name from the response + type: string + port: + description: JSONPath expression to extract the target + port from the response + type: string + required: + - address + - name + type: object + pagination: + description: Optional pagination configuration for parsing + responses from the HTTP endpoint + properties: + itemsField: + description: |- + JSONPath-style expression to extract the list of targets from the response + Example: "results" + type: string + nextField: + description: |- + JSONPath-style expression to extract the next page token or URL from the response for pagination + Example: "next" + type: string + type: object + timeout: + default: 10s + description: Optional timeout for HTTP requests to the endpoint + type: string + tls: + description: Optional TLS configuration for connecting to + the HTTP endpoint + properties: + caBundle: + description: |- + Base64-encoded bundle of PEM CAs which will be used to validate the certificate + chain presented by the Provider. Only used if using HTTPS to connect to Provider and + ignored for HTTP connections. + Mutually exclusive with CABundleSecretRef. + format: byte + type: string + caBundleSecretRef: + description: |- + Reference to a Secret containing a bundle of PEM-encoded CAs to use when + verifying the certificate chain presented by the Provider when using HTTPS. + Mutually exclusive with CABundle. + properties: + key: + description: The key of the secret to select from. Must + be a valid secret key. + type: string + name: + default: "" + description: |- + Name of the referent. + This field is effectively required, but due to backwards compatibility is + allowed to be empty. Instances of this type with an empty value here are + almost certainly wrong. + More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/names/#names + type: string + optional: + description: Specify whether the Secret or its key + must be defined + type: boolean + required: + - key + type: object + x-kubernetes-map-type: atomic + insecureSkipVerify: + default: false + description: Skip TLS verification of the Provider's certificate. + type: boolean + type: object + x-kubernetes-validations: + - message: caBundle and caBundleSecretRef are mutually exclusive + rule: '!(has(self.caBundle) && has(self.caBundleSecretRef))' url: - minLength: 1 + description: |- + URL of the HTTP endpoint to pull targets from + If defined, the loader will periodically poll this endpoint for targets type: string - required: - - url type: object + x-kubernetes-validations: + - message: at least one of the fields in [url acceptPush] must + be set + rule: '[has(self.url),has(self.acceptPush)].filter(x,x==true).size() + >= 1' type: object x-kubernetes-validations: - - message: exactly one of the fields in [http consul] must be set - rule: '[has(self.http),has(self.consul)].filter(x,x==true).size() - == 1' + - message: exactly one of the fields in [http] must be set + rule: '[has(self.http)].filter(x,x==true).size() == 1' targetLabels: additionalProperties: type: string + description: Optional labels to apply to all targets discovered by + this TargetSource type: object + targetPort: + description: Optional port to use for discovered targets if not specified + by the provider + format: int32 + type: integer targetProfile: + description: The TargetProfile to use for targets discovered by this + TargetSource minLength: 1 type: string required: diff --git a/go.mod b/go.mod index f236ded..827da2a 100644 --- a/go.mod +++ b/go.mod @@ -5,6 +5,7 @@ go 1.25.5 require ( github.com/cert-manager/cert-manager v1.19.3 github.com/go-logr/logr v1.4.3 + github.com/google/uuid v1.6.0 github.com/onsi/ginkgo/v2 v2.27.3 github.com/onsi/gomega v1.38.3 github.com/openconfig/gnmic/pkg/api v0.1.10 @@ -47,7 +48,6 @@ require ( github.com/google/gnostic-models v0.7.1 // indirect github.com/google/go-cmp v0.7.0 // indirect github.com/google/pprof v0.0.0-20251213031049-b05bdaca462f // indirect - github.com/google/uuid v1.6.0 // indirect github.com/json-iterator/go v1.1.12 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.3-0.20250322232337-35a7c28c31ee // indirect diff --git a/internal/apiserver/apiserver.go b/internal/apiserver/apiserver.go index f31abaa..5eb88b8 100644 --- a/internal/apiserver/apiserver.go +++ b/internal/apiserver/apiserver.go @@ -5,11 +5,16 @@ import ( "net/http" "github.com/gnmic/operator/internal/controller" + "github.com/gnmic/operator/internal/controller/discovery" + "github.com/gnmic/operator/internal/controller/discovery/core" + "k8s.io/apimachinery/pkg/types" ) type APIServer struct { Server *http.Server clusterReconciler *controller.ClusterReconciler + + DiscoveryRegistry *discovery.Registry[types.NamespacedName, core.DiscoveryRegistryValue] } func New(addr string, clusterReconciler *controller.ClusterReconciler) *APIServer { diff --git a/internal/controller/const.go b/internal/controller/const.go index b5196b8..5ef2e8f 100644 --- a/internal/controller/const.go +++ b/internal/controller/const.go @@ -21,6 +21,8 @@ const ( LabelCertType = "operator.gnmic.dev/cert-type" LabelValueCertTypeClient = "client" LabelValueCertTypeTunnel = "tunnel" + + LabelTargetSourceFinalizer = "operator.gnmic.dev/targetsource-finalizer" ) const ( diff --git a/internal/controller/discovery/client.go b/internal/controller/discovery/client.go index 3bc7ef7..9ccbb68 100644 --- a/internal/controller/discovery/client.go +++ b/internal/controller/discovery/client.go @@ -1,22 +1,29 @@ package discovery -// File may become obsolete, depends on how the logic to compare desired vs. existing state will get implemented - import ( "context" + "fmt" + corev1 "k8s.io/api/core/v1" "sigs.k8s.io/controller-runtime/pkg/client" gnmicv1alpha1 "github.com/gnmic/operator/api/v1alpha1" ) -func FetchExistingTargets(ctx context.Context, c client.Client, ts gnmicv1alpha1.TargetSource) ([]gnmicv1alpha1.Target, error) { +func fetchExistingTargets( + ctx context.Context, + c client.Client, + ts *gnmicv1alpha1.TargetSource, +) ([]gnmicv1alpha1.Target, error) { + var targetList gnmicv1alpha1.TargetList - err := c.List(ctx, &targetList, + err := c.List( + ctx, + &targetList, client.InNamespace(ts.Namespace), client.MatchingLabels{ - "gnmic.io/source": ts.Name, + LabelTargetSourceName: ts.Name, }, ) if err != nil { @@ -25,3 +32,44 @@ func FetchExistingTargets(ctx context.Context, c client.Client, ts gnmicv1alpha1 return targetList.Items, nil } + +// Helper: GetSecretValues returns values from a secret +// If keys are provided -> returns only those keys +// If keys is empty -> returns entire secret data +func GetSecretValues( + ctx context.Context, + c client.Client, + namespace string, + secretRef string, + keys ...string, +) (map[string]string, error) { + var secret corev1.Secret + if err := c.Get(ctx, + client.ObjectKey{ + Name: secretRef, + Namespace: namespace, + }, &secret); err != nil { + return nil, fmt.Errorf("failed to get secret %s/%s: %w", namespace, secretRef, err) + } + + result := make(map[string]string) + + // Return full secret + if len(keys) == 0 { + for k, v := range secret.Data { + result[k] = string(v) + } + return result, nil + } + + // Return specific keys + for _, key := range keys { + val, ok := secret.Data[key] + if !ok { + return nil, fmt.Errorf("key %s missing in secret %s/%s", key, namespace, secretRef) + } + result[key] = string(val) + } + + return result, nil +} diff --git a/internal/controller/discovery/const.go b/internal/controller/discovery/const.go new file mode 100644 index 0000000..ac7a57f --- /dev/null +++ b/internal/controller/discovery/const.go @@ -0,0 +1,6 @@ +package discovery + +const ( + // Labels + LabelTargetSourceName = "operator.gnmic.dev/targetsource" +) diff --git a/internal/controller/discovery/core/loader_interface.go b/internal/controller/discovery/core/loader_interface.go index 82036a8..895258a 100644 --- a/internal/controller/discovery/core/loader_interface.go +++ b/internal/controller/discovery/core/loader_interface.go @@ -2,8 +2,6 @@ package core import ( "context" - - gnmicv1alpha1 "github.com/gnmic/operator/api/v1alpha1" ) // Loader defines a pluggable TargetSource loader interface @@ -12,12 +10,7 @@ type Loader interface { // Name returns the unique loader identifier e.g. "pull" Name() string - // Start begins discovery and pushes target snapshots into the out channel - // The loader must stop cleanly when ctx is cancelled - Start( - ctx context.Context, - targetsourceName string, - spec gnmicv1alpha1.TargetSourceSpec, - out chan<- []DiscoveryMessage, - ) error + // Run begins discovery and pushes target snapshots or events into the out channel + // The loader must stop cleanly when ctx is canceled + Run(ctx context.Context, out chan<- []DiscoveryMessage) error } diff --git a/internal/controller/discovery/core/message_interface.go b/internal/controller/discovery/core/message_interface.go new file mode 100644 index 0000000..0836bc6 --- /dev/null +++ b/internal/controller/discovery/core/message_interface.go @@ -0,0 +1,8 @@ +package core + +type DiscoveryMessage interface { + isDiscoveryMessage() +} + +func (DiscoveryEvent) isDiscoveryMessage() {} +func (DiscoverySnapshot) isDiscoveryMessage() {} diff --git a/internal/controller/discovery/core/types.go b/internal/controller/discovery/core/types.go index 406a22b..99605b9 100644 --- a/internal/controller/discovery/core/types.go +++ b/internal/controller/discovery/core/types.go @@ -1,5 +1,39 @@ package core +import ( + "context" + + "k8s.io/apimachinery/pkg/types" +) + +// DiscoveryRegistryValue represents the controller-owned runtime state +// with its configuration for a single TargetSource +type DiscoveryRegistryValue struct { + // Channel is the outbound communication channel used by discovery + // components (loaders, webhooks, etc.) to emit discovery messages + Channel chan<- []DiscoveryMessage + // Stop cancels the discovery context associated with this registry entry + Stop context.CancelFunc + + CommonLoaderConfig *CommonLoaderConfig +} + +type CommonLoaderConfig struct { + TargetsourceNN types.NamespacedName + ChunkSize int + AcceptPush bool +} + +// EventAction represents the type of a discovery event +type EventAction int + +const ( + // EventDelete indicates that a target should be removed + EventDelete EventAction = iota + // EventApply indicates that a target should be applied (created or updated) + EventApply +) + // DiscoveredTarget represents a target discovered from an external source // before it is materialized as a Kubernetes Target CR type DiscoveredTarget struct { @@ -8,15 +42,14 @@ type DiscoveredTarget struct { Labels map[string]string } -const ( - DELETE DiscoveryEvent = 0 - CREATE DiscoveryEvent = 1 - UPDATE DiscoveryEvent = 2 -) - -type DiscoveryEvent int - -type DiscoveryMessage struct { +type DiscoveryEvent struct { Target DiscoveredTarget - Event DiscoveryEvent + Event EventAction +} + +type DiscoverySnapshot struct { + SnapshotID string + ChunkIndex int + TotalChunks int + Targets []DiscoveredTarget } diff --git a/internal/controller/discovery/discovery.go b/internal/controller/discovery/discovery.go new file mode 100644 index 0000000..491cdfb --- /dev/null +++ b/internal/controller/discovery/discovery.go @@ -0,0 +1,15 @@ +package discovery + +// Package discovery implements the discovery runtime subsystem. +// +// The discovery subsystem is responsible for: +// - Receiving discovery data from external providers (loaders, webhooks). +// - Applying discovered state to Kubernetes Targets. +// +// The package is structured into the following subpackages: +// - core: message contracts, snapshot/event types, and transport helpers. +// - message processor: snapshot + event target state application logic. +// - loaders: target discovery providers (HTTP, webhook, etc.). +// - registry: key -> channel registry. +// +// At the moment, the targetsource controller imports specific subpackages explicitly. diff --git a/internal/controller/discovery/loader.go b/internal/controller/discovery/loader.go deleted file mode 100644 index 3d45f42..0000000 --- a/internal/controller/discovery/loader.go +++ /dev/null @@ -1,24 +0,0 @@ -package discovery - -import ( - "fmt" - - gnmicv1alpha1 "github.com/gnmic/operator/api/v1alpha1" - "github.com/gnmic/operator/internal/controller/discovery/core" - pull "github.com/gnmic/operator/internal/controller/discovery/loaders/pull" -) - -// NewLoader creates a loader by name -func NewLoader(name string, namespace string, spec gnmicv1alpha1.TargetSourceSpec) (core.Loader, error) { - loaderName := namespace + "/" + name - - switch { - case spec.Provider.HTTP != nil: - return pull.New(), nil - case spec.Provider.Consul != nil: - return nil, fmt.Errorf("unknown targetsource loader, check TargetSource CRD for %s", loaderName) - default: - return nil, fmt.Errorf("unknown targetsource loader, check TargetSource CRD for %s", loaderName) - } - -} diff --git a/internal/controller/discovery/loaders.go b/internal/controller/discovery/loaders.go new file mode 100644 index 0000000..1e5ea46 --- /dev/null +++ b/internal/controller/discovery/loaders.go @@ -0,0 +1,166 @@ +package discovery + +import ( + "context" + "fmt" + + "sigs.k8s.io/controller-runtime/pkg/client" + + gnmicv1alpha1 "github.com/gnmic/operator/api/v1alpha1" + "github.com/gnmic/operator/internal/controller/discovery/core" + http "github.com/gnmic/operator/internal/controller/discovery/loaders/http" +) + +// NewLoader creates a loader by name +func NewLoader(ctx context.Context, c client.Client, cfg *core.CommonLoaderConfig, spec gnmicv1alpha1.TargetSourceSpec) (core.Loader, error) { + + switch { + case spec.Provider.HTTP != nil: + httpSpec := *spec.Provider.HTTP + cfg.AcceptPush = httpSpec.AcceptPush + + // TODO: watch secrets -> if secret changes reconcile has to be executed + if httpSpec.Authorization != nil { + if err := resolveAuthorizationIntoSpec( + ctx, + c, + cfg.TargetsourceNN.Namespace, + httpSpec.Authorization, + ); err != nil { + return nil, err + } + } + if httpSpec.TLS != nil { + if err := resolveTLSIntoSpec( + ctx, + c, + cfg.TargetsourceNN.Namespace, + httpSpec.TLS, + ); err != nil { + return nil, err + } + } + + return http.New(*cfg, httpSpec), nil + default: + return nil, fmt.Errorf("unknown targetsource provider, check TargetSource CRD for %s", cfg.TargetsourceNN) + } +} + +func resolveAuthorizationIntoSpec( + ctx context.Context, + c client.Client, + namespace string, + authSpec *gnmicv1alpha1.AuthorizationSpec, +) error { + if authSpec == nil { + return nil + } + auth := authSpec + + switch { + case auth.Basic != nil: + b := auth.Basic + + if b.CredentialsSecretRef != nil { + values, err := GetSecretValues( + ctx, + c, + namespace, + b.CredentialsSecretRef.Name, + "username", + "password", + ) + if err != nil { + return err + } + b.Username = values["username"] + b.Password = values["password"] + } + + case auth.Token != nil: + t := auth.Token + if t.TokenSecretRef != nil { + key := "token" + if t.TokenSecretRef.Key != "" { + key = t.TokenSecretRef.Key + } + values, err := GetSecretValues( + ctx, + c, + namespace, + t.TokenSecretRef.Name, + key, + ) + if err != nil { + return err + } + t.Token = values[key] + } + + // case auth.JWT != nil: + // jwt := auth.JWT + // if jwt.TokenSecretRef != nil { + // values, err := GetSecretValues( + // ctx, + // c, + // namespaceName, + // jwt.TokenSecretRef.Name, + // "token", + // ) + // if err != nil { + // return err + // } + // jwt.Token = values[jwt.TokenSecretRef.Key] + // } + // if jwt.SigningKeySecretRef != nil { + // values, err := GetSecretValues( + // ctx, + // c, + // namespaceName, + // jwt.SigningKeySecretRef.Name, + // "key", + // ) + // if err != nil { + // return err + // } + // jwt.Key = values[jwt.SigningKeySecretRef.Key] + + // } + } + + return nil +} + +func resolveTLSIntoSpec( + ctx context.Context, + c client.Client, + namespace string, + tlsSpec *gnmicv1alpha1.ClientTLSConfig, +) error { + if tlsSpec == nil { + return nil + } + tls := tlsSpec + + if tls.CABundleSecretRef != nil { + key := "ca.crt" + if tls.CABundleSecretRef.Key != "" { + key = tls.CABundleSecretRef.Key + } + values, err := GetSecretValues( + ctx, + c, + namespace, + tls.CABundleSecretRef.Name, + key, + ) + if err != nil { + return err + } + // convert string to []byte + tls.CABundle = []byte(values[key]) + } + + return nil +} diff --git a/internal/controller/discovery/loaders/all/all.go b/internal/controller/discovery/loaders/all/all.go deleted file mode 100644 index d05604b..0000000 --- a/internal/controller/discovery/loaders/all/all.go +++ /dev/null @@ -1,6 +0,0 @@ -package all - -import ( - _ "github.com/gnmic/operator/internal/controller/discovery/loaders/pull" - // _ "github.com/gnmic/operator/internal/controller/targetsource/loaders/push" -) diff --git a/internal/controller/discovery/loaders/http/loader.go b/internal/controller/discovery/loaders/http/loader.go new file mode 100644 index 0000000..d956799 --- /dev/null +++ b/internal/controller/discovery/loaders/http/loader.go @@ -0,0 +1,201 @@ +package http + +import ( + "context" + "crypto/tls" + "crypto/x509" + "encoding/json" + "fmt" + "net/http" + "time" + + "github.com/google/uuid" + + "sigs.k8s.io/controller-runtime/pkg/log" + + gnmicv1alpha1 "github.com/gnmic/operator/api/v1alpha1" + "github.com/gnmic/operator/internal/controller/discovery/core" + loaderUtils "github.com/gnmic/operator/internal/controller/discovery/loaders/utils" +) + +// Loader implements the HTTP pull discovery mechanism +type Loader struct { + loaderCfg core.CommonLoaderConfig + spec gnmicv1alpha1.HTTPConfig +} + +// New instantiates the http loader with the provided config +func New(cfg core.CommonLoaderConfig, httpConfig gnmicv1alpha1.HTTPConfig) core.Loader { + return &Loader{loaderCfg: cfg, spec: httpConfig} +} + +func (l *Loader) Name() string { + return "http" +} + +func (l *Loader) Run(ctx context.Context, out chan<- []core.DiscoveryMessage) error { + if l.spec.URL == "" { + return nil + } + + logger := log.FromContext(ctx).WithValues( + "component", "loader", + "name", l.Name(), + "targetsource", l.loaderCfg.TargetsourceNN, + ) + + logger.Info( + "HTTP loader started", + "targetsource", l.loaderCfg.TargetsourceNN.Name, + "namespace", l.loaderCfg.TargetsourceNN.Namespace, + ) + + logger.Info("HTTP loader started") + + // Input Validation of spec + // if l.spec.URL == "nil" { + // return errors.New("HTTP loader requires spec.provider.http to be set") + // } + + client, err := l.buildHTTPClient() + if err != nil { + return fmt.Errorf("failed to build HTTP client: %w", err) + } + interval := l.spec.PollInterval.Duration + ticker := time.NewTicker(interval) + defer ticker.Stop() + + logger.Info( + "HTTP polling discovery started", + "interval", interval.String(), + "url", l.spec.URL, + ) + + // helper function to fetch targets and emit discovery messages + fetchAndEmit := func() { + targets, err := l.fetchTargetsFromHTTPEndpoint(ctx, client) + if err != nil { + logger.Error( + err, + "Failed to fetch targets from HTTP endpoint", + "url", l.spec.URL, + ) + return + } + + snapshotID := fmt.Sprintf("%s-%s-%s", l.loaderCfg.TargetsourceNN.Namespace, l.loaderCfg.TargetsourceNN.Name, uuid.NewString()) + if err := loaderUtils.SendSnapshot(ctx, out, targets, snapshotID, l.loaderCfg.ChunkSize); err != nil { + logger.Error( + err, + "Failed to send discovery snapshot", + "snapshotID", snapshotID, + "targets", len(targets), + ) + return + } + + logger.Info( + "Discovery snapshot sent", + "snapshotID", snapshotID, + "targets", len(targets), + ) + } + + // Immediate fetch on startup + fetchAndEmit() + + // Periodic fetch + for { + select { + case <-ctx.Done(): + logger.Info("HTTP loader stopped") + return nil + + case <-ticker.C: + fetchAndEmit() + } + } +} + +func (l *Loader) buildHTTPClient() (*http.Client, error) { + tlsConfig := &tls.Config{ + InsecureSkipVerify: l.spec.TLS != nil && l.spec.TLS.InsecureSkipVerify, + } + + if l.spec.TLS != nil && len(l.spec.TLS.CABundle) > 0 { + certPool := x509.NewCertPool() + if ok := certPool.AppendCertsFromPEM(l.spec.TLS.CABundle); !ok { + return nil, fmt.Errorf("Failed to parse CA bundle for TargetSource %s/%s\n", l.loaderCfg.TargetsourceNN.Namespace, l.loaderCfg.TargetsourceNN.Name) + } + tlsConfig.RootCAs = certPool + } + + return &http.Client{ + Timeout: l.spec.Timeout.Duration, + Transport: &http.Transport{ + TLSClientConfig: tlsConfig, + }, + }, nil +} + +func (l *Loader) fetchTargetsFromHTTPEndpoint( + ctx context.Context, + client *http.Client, +) ([]core.DiscoveredTarget, error) { + req, err := http.NewRequestWithContext(ctx, http.MethodGet, l.spec.URL, nil) + if err != nil { + return nil, fmt.Errorf("creating HTTP request failed: %w", err) + } + + req.Header.Set("Accept", "application/json") + l.applyAuthorization(req) + + resp, err := client.Do(req) + if err != nil { + return nil, fmt.Errorf("HTTP request failed: %w", err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + return nil, fmt.Errorf("unexpected HTTP status code: %d", resp.StatusCode) + } + + var targets []core.DiscoveredTarget + if err := json.NewDecoder(resp.Body).Decode(&targets); err != nil { + return nil, fmt.Errorf("failed to decode HTTP response: %w", err) + } + + return targets, nil +} + +func (l *Loader) applyAuthorization(req *http.Request) { + auth := l.spec.Authorization + if auth == nil { + return + } + + switch { + case auth.Basic != nil: + req.SetBasicAuth( + auth.Basic.Username, + auth.Basic.Password, + ) + + case auth.Token != nil: + req.Header.Set( + "Authorization", + fmt.Sprintf("%s %s", + auth.Token.Scheme, + auth.Token.Token, + ), + ) + + // case auth.JWT != nil: + // if auth.JWT.Token != "" { + // req.Header.Set( + // "Authorization", + // fmt.Sprintf("Bearer %s", auth.JWT.Token), + // ) + // } + } +} diff --git a/internal/controller/discovery/loaders/http/loader_test.go b/internal/controller/discovery/loaders/http/loader_test.go new file mode 100644 index 0000000..d02cfda --- /dev/null +++ b/internal/controller/discovery/loaders/http/loader_test.go @@ -0,0 +1 @@ +package http diff --git a/internal/controller/discovery/loaders/pull/loader.go b/internal/controller/discovery/loaders/pull/loader.go deleted file mode 100644 index 5540ea2..0000000 --- a/internal/controller/discovery/loaders/pull/loader.go +++ /dev/null @@ -1,78 +0,0 @@ -package pull - -import ( - "context" - "time" - - "sigs.k8s.io/controller-runtime/pkg/log" - - gnmicv1alpha1 "github.com/gnmic/operator/api/v1alpha1" - "github.com/gnmic/operator/internal/controller/discovery/core" -) - -type Loader struct{} - -// New instantiates the pull loader -func New() core.Loader { - return &Loader{} -} - -func (l *Loader) Name() string { - return "pull" -} - -func (l *Loader) Start( - ctx context.Context, - targetsourceName string, - spec gnmicv1alpha1.TargetSourceSpec, - out chan<- []core.DiscoveryMessage, -) error { - logger := log.FromContext(ctx).WithValues("loader", l.Name()) - - logger.Info("HTTP pull loader started") - - // Only for debugging: emit a static snapshot every 30 seconds - ticker := time.NewTicker(30 * time.Second) - defer ticker.Stop() - - for { - select { - case <-ctx.Done(): - logger.Info("HTTP pull loader stopped") - return nil - - case <-ticker.C: - // Example snapshot (placeholder) - targets := []core.DiscoveryMessage{ - { - Target: core.DiscoveredTarget{ - Name: "ceos1", - Address: "clab-3-nodes-ceos1:6030", - Labels: map[string]string{"TargetSource": targetsourceName}, - }, - Event: core.CREATE, - }, - { - Target: core.DiscoveredTarget{ - Name: "leaf1", - Address: "clab-3-nodes-leaf1:57400", - Labels: map[string]string{"TargetSource": targetsourceName}, - }, - Event: core.CREATE, - }, - } - - // Non-blocking context-aware send - select { - case out <- targets: - logger.V(1).Info( - "emitted target snapshot", - "count", len(targets), - ) - case <-ctx.Done(): - logger.Info("context cancelled while emitting targets") - return nil - } - } - } -} diff --git a/internal/controller/discovery/loaders/pull/loader_test.go b/internal/controller/discovery/loaders/pull/loader_test.go deleted file mode 100644 index 0493bec..0000000 --- a/internal/controller/discovery/loaders/pull/loader_test.go +++ /dev/null @@ -1 +0,0 @@ -package pull diff --git a/internal/controller/discovery/loaders/push/loader.go b/internal/controller/discovery/loaders/push/loader.go deleted file mode 100644 index 92f0ccc..0000000 --- a/internal/controller/discovery/loaders/push/loader.go +++ /dev/null @@ -1,4 +0,0 @@ -package push - -// this file implements the logic receive target updates via HTTP push -// REST API defined internal/apiserver diff --git a/internal/controller/discovery/loaders/push/loader_test.go b/internal/controller/discovery/loaders/push/loader_test.go deleted file mode 100644 index 63fdf61..0000000 --- a/internal/controller/discovery/loaders/push/loader_test.go +++ /dev/null @@ -1 +0,0 @@ -package push diff --git a/internal/controller/discovery/loaders/utils/send.go b/internal/controller/discovery/loaders/utils/send.go new file mode 100644 index 0000000..3cfba8d --- /dev/null +++ b/internal/controller/discovery/loaders/utils/send.go @@ -0,0 +1,94 @@ +package utils + +import ( + "context" + "fmt" + + "github.com/gnmic/operator/internal/controller/discovery/core" +) + +// sendMessages sends discovery messages over a channel in a context-aware manner +func sendMessages(ctx context.Context, out chan<- []core.DiscoveryMessage, messages []core.DiscoveryMessage) error { + select { + case <-ctx.Done(): + return ctx.Err() + case out <- messages: + } + return nil +} + +// forEachChunk iterates over ranges [start,end) for a total count using the provided chunkSize +func forEachChunk(total, chunkSize int, fn func(start, end int) error) error { + for i := 0; i < total; i += chunkSize { + end := i + chunkSize + if end > total { + end = total + } + if err := fn(i, end); err != nil { + return err + } + } + return nil +} + +// createDiscoverySnapshots takes a list of discovered targets and returns chunked DiscoverySnapshots +func createDiscoverySnapshots(targets []core.DiscoveredTarget, snapshotID string, chunkSize int) []core.DiscoverySnapshot { + var snapshots []core.DiscoverySnapshot + totalTargets := len(targets) + totalChunks := (totalTargets + chunkSize - 1) / chunkSize + + _ = forEachChunk(totalTargets, chunkSize, func(i, end int) error { + chunk := targets[i:end] + snapshots = append(snapshots, core.DiscoverySnapshot{ + Targets: chunk, + SnapshotID: snapshotID, + ChunkIndex: i / chunkSize, + TotalChunks: totalChunks, + }) + return nil + }) + + return snapshots +} + +// SendSnapshot sends discovered targets as a snapshot over a channel in chunks +func SendSnapshot(ctx context.Context, out chan<- []core.DiscoveryMessage, targets []core.DiscoveredTarget, snapshotID string, chunkSize int) error { + if len(targets) == 0 { + return fmt.Errorf("no targets in Snapshot") + } + + snapshots := createDiscoverySnapshots(targets, snapshotID, chunkSize) + for _, snapshot := range snapshots { + // Convert DiscoverySnapshot to DiscoveryMessage + messages := make([]core.DiscoveryMessage, 1) + messages[0] = snapshot + + if err := sendMessages(ctx, out, messages); err != nil { + return err + } + } + + return nil +} + +func eventsToMessages(events []core.DiscoveryEvent) []core.DiscoveryMessage { + message := make([]core.DiscoveryMessage, len(events)) + for i, event := range events { + message[i] = event + } + return message +} + +// SendEvents sends discovery messages over channel in a context-aware manner +func SendEvents(ctx context.Context, out chan<- []core.DiscoveryMessage, events []core.DiscoveryEvent, chunkSize int) error { + if len(events) == 0 { + return fmt.Errorf("no events to process") + } + + messages := eventsToMessages(events) + total := len(messages) + + return forEachChunk(total, chunkSize, func(i, end int) error { + return sendMessages(ctx, out, messages[i:end]) + }) +} diff --git a/internal/controller/discovery/mapper.go b/internal/controller/discovery/mapper.go deleted file mode 100644 index 18470b2..0000000 --- a/internal/controller/discovery/mapper.go +++ /dev/null @@ -1,4 +0,0 @@ -package discovery - -// This file makes diff between existing and new targets -// file decides which targets to create/update/delete diff --git a/internal/controller/discovery/mapper_test.go b/internal/controller/discovery/mapper_test.go deleted file mode 100644 index 5844159..0000000 --- a/internal/controller/discovery/mapper_test.go +++ /dev/null @@ -1 +0,0 @@ -package discovery diff --git a/internal/controller/discovery/message_processor.go b/internal/controller/discovery/message_processor.go new file mode 100644 index 0000000..f7aafb1 --- /dev/null +++ b/internal/controller/discovery/message_processor.go @@ -0,0 +1,296 @@ +package discovery + +import ( + "context" + "fmt" + + "k8s.io/apimachinery/pkg/runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/log" + + gnmicv1alpha1 "github.com/gnmic/operator/api/v1alpha1" + "github.com/gnmic/operator/internal/controller/discovery/core" + "github.com/go-logr/logr" +) + +type snapshotBuffer struct { + snapshotID string + totalChunks int + received map[int][]core.DiscoveredTarget + complete bool +} + +// MessageProcessor consumes discovery messages and applies them to Kubernetes +type MessageProcessor struct { + client client.Client + scheme *runtime.Scheme + targetSource *gnmicv1alpha1.TargetSource + in <-chan []core.DiscoveryMessage + queue []core.DiscoveryMessage + activeSnapshot *snapshotBuffer + // Events are deferred while snapshot is in progress + deferredEvents []core.DiscoveryEvent +} + +// NewMessageProcessor wires a MessageProcessor instance +func NewMessageProcessor(c client.Client, s *runtime.Scheme, ts *gnmicv1alpha1.TargetSource, in <-chan []core.DiscoveryMessage) *MessageProcessor { + return &MessageProcessor{ + client: c, + scheme: s, + targetSource: ts, + in: in, + } +} + +// Run is a long‑running loop that receives target snapshots +// and reconciles Target CRs accordingly +func (m *MessageProcessor) Run(ctx context.Context) error { + logger := log.FromContext(ctx).WithValues( + "component", "message-processor", + "targetsource", m.targetSource.Name, + "namespace", m.targetSource.Namespace, + ) + + logger.Info("Message processor started") + + for { + select { + case batch, ok := <-m.in: + if !ok { + // Channel closed, pipeline is shutting down + logger.Info("Input channel closed; stopping message processor") + return nil + } + m.queue = append(m.queue, batch...) + + case <-ctx.Done(): + logger.Info("Context was canceled; stopping message processor") + return nil + } + + for len(m.queue) > 0 { + if ctx.Err() != nil { + return ctx.Err() + } + + msg := m.queue[0] + m.queue = m.queue[1:] + + if err := m.processMessage(ctx, msg, logger); err != nil { + // Returning error lets the supervisor (controller) + // tear down and restart the pipeline via reconciliation + logger.Info( + "Could not process the message", + "error", err, + ) + return nil + } + + } + } +} + +func (m *MessageProcessor) processMessage(ctx context.Context, message core.DiscoveryMessage, logger logr.Logger) error { + if err := ctx.Err(); err != nil { + return err + } + + // Type assert to determine if this is a snapshot or event + switch msg := message.(type) { + case core.DiscoverySnapshot: + // Collect snapshot chunks + logger.Info( + "Received discovery snapshot chunk", + "snapshotID", msg.SnapshotID, + "chunkIndex", msg.ChunkIndex, + "targets", len(msg.Targets), + ) + return m.processSnapshot(ctx, msg, logger) + + case core.DiscoveryEvent: + // Process individual event-driven update + logger.Info( + "Received discovery event", + "event", msg.Event, + "target", msg.Target.Name, + ) + return m.processEvent(ctx, msg, logger) + + default: + return fmt.Errorf("Unknown discovery message type %T", msg) + } +} + +// processSnapshot takes a complete snapshot of discovered targets and reconciles Target CRs accordingly +func (m *MessageProcessor) processSnapshot(ctx context.Context, chunk core.DiscoverySnapshot, logger logr.Logger) error { + if m.activeSnapshot == nil { + return m.startNewSnapshot(ctx, chunk, logger) + } + + snapshot := m.activeSnapshot + // Check if a new snapshot arrived + if snapshot.snapshotID != chunk.SnapshotID { + // If current snapshot is complete apply it first + if snapshot.complete { + if err := m.applySnapshot(ctx, snapshot, logger); err != nil { + return err + } + } else { + // If a new snapshot is started before the old one completed + // the old one can be discarded + logger.Info( + "Discarded incomplete discovery snapshot", + "snapshotID", snapshot.snapshotID, + ) + } + + // Start collecting the new snapshot + return m.startNewSnapshot(ctx, chunk, logger) + } + + return m.collectSnapshot(ctx, chunk, logger) +} + +func (m *MessageProcessor) startNewSnapshot(ctx context.Context, chunk core.DiscoverySnapshot, logger logr.Logger) error { + m.activeSnapshot = &snapshotBuffer{ + snapshotID: chunk.SnapshotID, + totalChunks: chunk.TotalChunks, + received: make(map[int][]core.DiscoveredTarget), + complete: false, + } + // Delete buffered events that will be current with new snapshot + m.deferredEvents = nil + + return m.collectSnapshot(ctx, chunk, logger) +} + +func (m *MessageProcessor) collectSnapshot(ctx context.Context, chunk core.DiscoverySnapshot, logger logr.Logger) error { + snapshot := m.activeSnapshot + + if chunk.TotalChunks != snapshot.totalChunks { + logger.Error( + nil, + "Snapshot totalChunks mismatch", + "snapshotID", snapshot.snapshotID, + ) + return fmt.Errorf("snapshot totalChunks mismatch") + } + if chunk.ChunkIndex < 0 || chunk.ChunkIndex >= snapshot.totalChunks { + logger.Error( + nil, + "Snapshot chunk index out of range", + "chunkIndex", chunk.ChunkIndex, + ) + m.resetSnapshot() + return fmt.Errorf("invalid chunk index") + } + if _, exists := snapshot.received[chunk.ChunkIndex]; exists { + logger.Error( + nil, + "Duplicate snapshot chunk received", + "chunkIndex", chunk.ChunkIndex, + ) + m.resetSnapshot() + return fmt.Errorf("duplicate snapshot chunk") + } + + snapshot.received[chunk.ChunkIndex] = chunk.Targets + + if len(snapshot.received) == snapshot.totalChunks { + snapshot.complete = true + return m.applySnapshot(ctx, snapshot, logger) + } + + return nil +} + +func (m *MessageProcessor) applySnapshot(ctx context.Context, snapshot *snapshotBuffer, logger logr.Logger) error { + select { + case <-ctx.Done(): + m.resetSnapshot() + return nil + default: + } + + var allTargets []core.DiscoveredTarget + for i := 0; i < snapshot.totalChunks; i++ { + select { + case <-ctx.Done(): + m.resetSnapshot() + return nil + default: + } + + chunk, ok := snapshot.received[i] + if !ok { + logger.Error( + nil, + "Missing snapshot chunk", + "chunkIndex", i, + ) + m.resetSnapshot() + return fmt.Errorf("missing snapshot chunk %d", i) + } + allTargets = append(allTargets, chunk...) + } + + logger.Info( + "Applying discovery snapshot", + "snapshotID", snapshot.snapshotID, + "targets", len(allTargets), + ) + + // todo: apply all targets + // a.applyTargets + + // Replay deferred events + for _, event := range m.deferredEvents { + select { + case <-ctx.Done(): + return nil + default: + } + if err := m.applyEvent(ctx, event, logger); err != nil { + return err + } + } + + m.resetSnapshot() + m.deferredEvents = nil + return nil +} + +func (m *MessageProcessor) processEvent(ctx context.Context, event core.DiscoveryEvent, logger logr.Logger) error { + // If snapshot collecting is active defer events + if m.activeSnapshot != nil { + m.deferredEvents = append(m.deferredEvents, event) + return nil + } + + // Apply events + return m.applyEvent(ctx, event, logger) +} + +func (m *MessageProcessor) applyEvent(ctx context.Context, event core.DiscoveryEvent, logger logr.Logger) error { + switch event.Event { + case core.EventDelete: + logger.Info( + "Deleting Target", + "target", event.Target.Name, + "targetsource", m.targetSource.Name, + ) + case core.EventApply: + logger.Info( + "Applying Target", + "target", event.Target.Name, + "address", event.Target.Address, + "labels", event.Target.Labels, + "targetsource", m.targetSource.Name, + ) + } + return nil +} + +func (m *MessageProcessor) resetSnapshot() { + m.activeSnapshot = nil +} diff --git a/internal/controller/discovery/registry.go b/internal/controller/discovery/registry.go new file mode 100644 index 0000000..2193665 --- /dev/null +++ b/internal/controller/discovery/registry.go @@ -0,0 +1,49 @@ +package discovery + +import ( + "fmt" + "sync" +) + +// Registry is a thread-safe key -> channel registry +// K must be comparable so it can be used as a map key +// DO NOT USE a pointer type as K +type Registry[K comparable, V any] struct { + mu sync.RWMutex + m map[K]V +} + +func NewRegistry[K comparable, V any]() *Registry[K, V] { + return &Registry[K, V]{m: make(map[K]V)} +} + +func (r *Registry[K, V]) Register(key K, value V) error { + r.mu.Lock() + defer r.mu.Unlock() + if _, exists := r.m[key]; exists { + return fmt.Errorf("already registered: %v", key) + } + r.m[key] = value + return nil +} + +func (r *Registry[K, V]) Unregister(key K) { + r.mu.Lock() + delete(r.m, key) + r.mu.Unlock() +} + +func (r *Registry[K, V]) Get(key K) (V, bool) { + r.mu.RLock() + value, ok := r.m[key] + r.mu.RUnlock() + return value, ok +} + +func (r *Registry[K, V]) Exists(key K) bool { + r.mu.RLock() + defer r.mu.RUnlock() + + _, exists := r.m[key] + return exists +} diff --git a/internal/controller/discovery/target_manager.go b/internal/controller/discovery/target_manager.go deleted file mode 100644 index 245942d..0000000 --- a/internal/controller/discovery/target_manager.go +++ /dev/null @@ -1,70 +0,0 @@ -package discovery - -import ( - "context" - - "k8s.io/apimachinery/pkg/runtime" - "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/log" - - gnmicv1alpha1 "github.com/gnmic/operator/api/v1alpha1" - "github.com/gnmic/operator/internal/controller/discovery/core" -) - -// TargetManager consumes discovered targets and applies them to Kubernetes. -type TargetManager struct { - client client.Client - scheme *runtime.Scheme - targetSource *gnmicv1alpha1.TargetSource - in <-chan []core.DiscoveryMessage -} - -// NewTargetManager wires a TargetManager instance. -func NewTargetManager(c client.Client, s *runtime.Scheme, ts *gnmicv1alpha1.TargetSource, in <-chan []core.DiscoveryMessage) *TargetManager { - return &TargetManager{ - client: c, - scheme: s, - targetSource: ts, - in: in, - } -} - -// Run is a long‑running loop that receives target snapshots -// and reconciles Target CRs accordingly -func (m *TargetManager) Run(ctx context.Context) error { - logger := log.FromContext(ctx). - WithValues("targetSource", m.targetSource) - - logger.Info("target manager started") - - for { - select { - case <-ctx.Done(): - logger.Info("target manager stopped") - return nil - - case targets := <-m.in: - logger.Info( - "received discovered targets", - "count", len(targets), - ) - - // List existing Target CRs owned by this TargetSource - // var existing gnmicv1alpha1.TargetList - // if err := m.client.List( - // ctx, - // &existing, - // client.MatchingLabels{ - // "gnmic.dev/targetsource": m.targetsource, - // }, - // ); err != nil { - // return err - // } - - // TODO: Target Lifecycle Management - // 1. Compare and determine which Targets to create/update/delete - // 2. Create/update/delete Target CRs accordingly - // 3. Update TargetSource status with sync results - } - } -} diff --git a/internal/controller/targetsource_controller.go b/internal/controller/targetsource_controller.go index 8cd6f68..7cf135e 100644 --- a/internal/controller/targetsource_controller.go +++ b/internal/controller/targetsource_controller.go @@ -18,9 +18,10 @@ package controller import ( "context" - "sync" + apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" @@ -28,23 +29,28 @@ import ( gnmicv1alpha1 "github.com/gnmic/operator/api/v1alpha1" "github.com/gnmic/operator/internal/controller/discovery" - "github.com/gnmic/operator/internal/controller/discovery/core" - _ "github.com/gnmic/operator/internal/controller/discovery/loaders/all" + discoveryTypes "github.com/gnmic/operator/internal/controller/discovery/core" + "github.com/go-logr/logr" ) -const targetSourceFinalizer = "operator.gnmic.dev/targetsource-finalizer" - -type runningSource struct { - cancel context.CancelFunc -} - // TargetSourceReconciler reconciles a TargetSource object +// +// Responsibilities: +// - Ensure at most one discovery runtime per TargetSource +// - Start runtime on reconcile if not already running +// - Restart runtime on reconcile if spec changed +// - Stop runtime on deletion or NotFound type TargetSourceReconciler struct { client.Client Scheme *runtime.Scheme - mu sync.Mutex - running map[client.ObjectKey]runningSource + BufferSize int + ChunkSize int + + DiscoveryRegistry *discovery.Registry[ + types.NamespacedName, + discoveryTypes.DiscoveryRegistryValue, + ] } // +kubebuilder:rbac:groups=operator.gnmic.dev,resources=targetsources,verbs=get;list;watch;create;update;patch;delete @@ -55,110 +61,175 @@ type TargetSourceReconciler struct { // Reconcile is part of the main kubernetes reconciliation loop which aims to // move the current state of the cluster closer to the desired state. func (r *TargetSourceReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { - logger := log.FromContext(ctx) - - var targetSource gnmicv1alpha1.TargetSource - if err := r.Get(ctx, req.NamespacedName, &targetSource); err != nil { - // If the TargetSource no longer exists, ensure runtime cleanup - if client.IgnoreNotFound(err) == nil { - r.stopDiscovery(req.NamespacedName) + logger := log.FromContext(ctx). + WithName("targetsource-controller"). + WithValues( + "targetsource", req.NamespacedName.Name, + "namespace", req.NamespacedName.Namespace, + ) + + targetSource, err := r.fetchTargetSource(ctx, req.NamespacedName) + // If the TargetSource no longer exists, ensure runtime cleanup + if apierrors.IsNotFound(err) { + if runtime, ok := r.DiscoveryRegistry.Get(req.NamespacedName); ok { + runtime.Stop() + r.DiscoveryRegistry.Unregister(req.NamespacedName) } - return ctrl.Result{}, client.IgnoreNotFound(err) + logger.Info("TargetSource not found; stopped discovery runtime") + return ctrl.Result{}, nil + } else if err != nil { + return ctrl.Result{}, err } - logger.Info("reconciling TargetSource", "name", targetSource.Name) - - // Handle deletion with finalizer if !targetSource.DeletionTimestamp.IsZero() { - logger.Info("TargetSource is being deleted, stopping pipeline", "name", targetSource.Name) - - r.stopDiscovery(req.NamespacedName) + return r.reconcileDeletion(ctx, req.NamespacedName, targetSource) + } - // Remove finalizer if exists - if controllerutil.ContainsFinalizer(&targetSource, targetSourceFinalizer) { - controllerutil.RemoveFinalizer(&targetSource, targetSourceFinalizer) - if err := r.Update(ctx, &targetSource); err != nil { - return ctrl.Result{}, err - } - } + if err := r.ensureFinalizer(ctx, targetSource); err != nil { + return ctrl.Result{}, err + } + if r.DiscoveryRegistry.Exists(req.NamespacedName) { + logger.Info("Discovery runtime already running; reconciliation completed") return ctrl.Result{}, nil } - // Ensure finalizer is set - if !controllerutil.ContainsFinalizer(&targetSource, targetSourceFinalizer) { - controllerutil.AddFinalizer(&targetSource, targetSourceFinalizer) - if err := r.Update(ctx, &targetSource); err != nil { + if err := r.startDiscovery(ctx, req.NamespacedName, targetSource, logger); err != nil { + return ctrl.Result{}, err + } + + logger.Info("Started discovery runtime") + return ctrl.Result{}, nil +} + +// fetchTargetSource retrieves a TargetSource by name, handling cleanup if not found +func (r *TargetSourceReconciler) fetchTargetSource(ctx context.Context, key types.NamespacedName) (*gnmicv1alpha1.TargetSource, error) { + var targetSource gnmicv1alpha1.TargetSource + if err := r.Get(ctx, key, &targetSource); err != nil { + return nil, err + } + return &targetSource, nil +} + +// reconcileDeletion stops the discovery runtime and removes the finalizer +func (r *TargetSourceReconciler) reconcileDeletion(ctx context.Context, key types.NamespacedName, targetSource *gnmicv1alpha1.TargetSource) (ctrl.Result, error) { + logger := log.FromContext(ctx).WithValues( + "targetsource", key.Name, + "namespace", key.Namespace, + ) + logger.Info("TargetSource was marked for deletion; stopping discovery runtime") + if runtime, ok := r.DiscoveryRegistry.Get(key); ok { + runtime.Stop() + r.DiscoveryRegistry.Unregister(key) + } + + // Remove finalizer if exists + if controllerutil.ContainsFinalizer(targetSource, LabelTargetSourceFinalizer) { + controllerutil.RemoveFinalizer(targetSource, LabelTargetSourceFinalizer) + if err := r.Update(ctx, targetSource); err != nil { return ctrl.Result{}, err } - // Requeue to continue with a clean state - return ctrl.Result{}, nil + + logger.Info("Removed TargetSource finalizer") } - // TODO: - // 1. Check if a pipeline is already running for this TargetSource - // 2. If not, create and start a new pipeline: - // a. Create a Loader based on TargetSource spec - // b. Start the Loader in a new goroutine, passing a channel for discovered targets - // c. Start a TargetManager in another goroutine to consume discovered targets and manage Target CRs - // 3. If yes, check if the spec has changed and restart the pipeline if needed - - r.mu.Lock() - _, exists := r.running[req.NamespacedName] - r.mu.Unlock() - - // If a targetsource loader exists, return immediately without starting - // any new loader or target manager - if exists { - return ctrl.Result{}, nil + return ctrl.Result{}, nil +} + +// ensureFinalizer adds the finalizer if not present and updates the TargetSource +func (r *TargetSourceReconciler) ensureFinalizer(ctx context.Context, targetSource *gnmicv1alpha1.TargetSource) error { + if controllerutil.ContainsFinalizer(targetSource, LabelTargetSourceFinalizer) { + return nil } - loader, err := discovery.NewLoader(targetSource.ObjectMeta.Name, targetSource.ObjectMeta.Namespace, targetSource.Spec) - if err != nil { - return ctrl.Result{}, err + controllerutil.AddFinalizer(targetSource, LabelTargetSourceFinalizer) + if err := r.Update(ctx, targetSource); err != nil { + return err } - runtimeCtx, cancel := context.WithCancel(context.Background()) + log.FromContext(ctx).Info( + "Added TargetSource finalizer", + "targetsource", targetSource.Name, + "namespace", targetSource.Namespace, + ) - targetChannel := make(chan []core.DiscoveryMessage, 10) + return nil +} - // start loader - go loader.Start(runtimeCtx, targetSource.Name, targetSource.Spec, targetChannel) +// startDiscovery creates and starts a discovery runtime for a TargetSource +// +// Invariant: +// - MessageProcessor and Loader must run for the lifetime of the TargetSource +// - Any unexpected exit is treated as a bug and triggers full shutdown +func (r *TargetSourceReconciler) startDiscovery( + reconcileCtx context.Context, + key types.NamespacedName, + targetSource *gnmicv1alpha1.TargetSource, + logger logr.Logger, +) error { + targetChannel := make(chan []discoveryTypes.DiscoveryMessage, r.BufferSize) + ctx, cancel := context.WithCancel(context.Background()) + loaderConfig := discoveryTypes.CommonLoaderConfig{ + TargetsourceNN: key, + ChunkSize: r.ChunkSize, + } - // start target manager - manager := discovery.NewTargetManager( + // Cleanup function to cleanup discovery runtime of targetsource + cleanup := func() { + cancel() + r.DiscoveryRegistry.Unregister(key) + } + + messageProcessor := discovery.NewMessageProcessor( r.Client, r.Scheme, - &targetSource, + targetSource, targetChannel, ) - go manager.Run(runtimeCtx) + loader, err := discovery.NewLoader(reconcileCtx, r.Client, &loaderConfig, targetSource.Spec) + if err != nil { + logger.Error(err, "Target loader could not be created") + cleanup() + return err + } - r.mu.Lock() - r.running[req.NamespacedName] = runningSource{cancel: cancel} - r.mu.Unlock() + // Register discovery runtime of targetsource + if err := r.DiscoveryRegistry.Register(key, discoveryTypes.DiscoveryRegistryValue{ + Channel: targetChannel, + Stop: cancel, + CommonLoaderConfig: &loaderConfig, + }); err != nil { + return err + } - logger.Info("TargetSource pipeline started", "name", targetSource.Name) + // Start message processor + go func() { + logger.Info("Message processor started") - return ctrl.Result{}, nil -} + if err := messageProcessor.Run(ctx); err != nil { + logger.Error(err, "Message processor exited unecpectedly") + } else { + logger.Error(nil, "Message processor exited unexpectedly without error") + } -// stopDiscovery stops and removes a running discovery pipeline -// for the given TargetSource key -func (r *TargetSourceReconciler) stopDiscovery(key client.ObjectKey) { - r.mu.Lock() - defer r.mu.Unlock() + // Any exit is considered a bug that should stop the discovery runtime + cleanup() + }() - if running, ok := r.running[key]; ok { - running.cancel() - delete(r.running, key) - } + // Start target loader + go func() { + if err := loader.Run(ctx, targetChannel); err != nil { + logger.Error(err, "Target loader exited unexpectedly") + } else { + logger.Error(nil, "Target loader exited unexpectedly without error") + } + }() + + return nil } // SetupWithManager sets up the controller with the Manager. func (r *TargetSourceReconciler) SetupWithManager(mgr ctrl.Manager) error { - r.running = make(map[client.ObjectKey]runningSource) - return ctrl.NewControllerManagedBy(mgr). For(&gnmicv1alpha1.TargetSource{}). Named("targetsource"). diff --git a/lab/dev/resources/targetsources/ctest1.yaml b/lab/dev/resources/targetsources/ctest1.yaml deleted file mode 100644 index e0aea43..0000000 --- a/lab/dev/resources/targetsources/ctest1.yaml +++ /dev/null @@ -1,12 +0,0 @@ -apiVersion: operator.gnmic.dev/v1alpha1 -kind: TargetSource -metadata: - name: http-discovery -spec: - provider: - http: - url: http://inventory-service:8080/targets - labels: - source: inventory - type: http - profile: eos \ No newline at end of file