Skip to content

Commit 4ba1ea8

Browse files
committed
[FEAT] Operator: Concurrent Reconciles supported
Enable concurrent reconciles for resources with the following defaults: ``` { CAPApplication: 1, CAPApplicationVersion: 3, CAPTenant: 10, CAPTenantOperation: 10, Domain: 1, ClusterDomain: 1, } ``` This can be overridden using the corresponding env: ``` "MAX_CONCURRENT_RECONCILES_CAP_APPLICATION", "MAX_CONCURRENT_RECONCILES_CAP_APPLICATION_VERSION", "MAX_CONCURRENT_RECONCILES_CAP_TENANT", "MAX_CONCURRENT_RECONCILES_CAP_TENANT_OPERATION", "MAX_CONCURRENT_RECONCILES_DOMAIN", "MAX_CONCURRENT_RECONCILES_CLUSTER_DOMAIN", ``` on the CAP Operator controller. Apart from the above, the following changes were made: - custom ratelimit workqueue configuration for tenants and tenant operations to avoid long delays in processing of these resources. - default resync period for operator resources set to 5 minutes.
1 parent e9c0784 commit 4ba1ea8

3 files changed

Lines changed: 107 additions & 28 deletions

File tree

cmd/controller/main.go

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,10 @@ package main
77

88
import (
99
"context"
10+
"maps"
1011
"os"
1112
"os/signal"
13+
"strconv"
1214
"syscall"
1315
"time"
1416

@@ -31,7 +33,8 @@ import (
3133
)
3234

3335
const (
34-
LeaseLockName = "capoperator-lease-lock"
36+
LeaseLockName = "capoperator-lease-lock"
37+
MaxConcurrentReconcilesEnvPrefix = "MAX_CONCURRENT_RECONCILES_"
3538
)
3639

3740
func main() {
@@ -82,6 +85,9 @@ func main() {
8285
// Initialize/start metrics server
8386
util.InitMetricsServer()
8487

88+
// Get concurrency config for each resource kind from environment variables or use defaults
89+
concurrencyConfig := getDefaultConcurrencyConfig()
90+
8591
// context for the reconciliation controller
8692
ctx, cancel := context.WithCancel(context.Background())
8793
defer cancel()
@@ -133,6 +139,8 @@ func main() {
133139
klog.InfoS("check & update of subscriptionGUID label done")
134140

135141
c := controller.NewController(coreClient, crdClient, istioClient, certClient, certManagerClient, dnsClient, promClient)
142+
// Update the controller's concurrency config before starting the controller
143+
maps.Copy(controller.DefaultConcurrentReconciles, concurrencyConfig)
136144
go c.Start(ctx)
137145
},
138146
OnStoppedLeading: func() {
@@ -148,3 +156,28 @@ func main() {
148156
},
149157
})
150158
}
159+
160+
func getDefaultConcurrencyConfig() map[int]int {
161+
// inline function to get concurrency config for each resource kind from environment variables or use defaults
162+
getConcurrencyConfigForResource := func(resourceEnvSuffix string, defaultVal int) int {
163+
reconcileEnv := MaxConcurrentReconcilesEnvPrefix + resourceEnvSuffix
164+
if val := os.Getenv(reconcileEnv); val != "" {
165+
if intVal, err := strconv.Atoi(val); err == nil {
166+
return intVal
167+
}
168+
}
169+
return defaultVal
170+
}
171+
172+
// Configure default concurrency for each resource kind, can be overridden by environment variables
173+
concurrencyConfig := make(map[int]int)
174+
175+
for resourceKey, resourceEnvSuffix := range controller.ResourceEnvSuffixMap {
176+
defaultReconcileForResource, ok := controller.DefaultConcurrentReconciles[resourceKey]
177+
if !ok {
178+
defaultReconcileForResource = controller.DefaultReconcile
179+
}
180+
concurrencyConfig[resourceKey] = getConcurrencyConfigForResource(resourceEnvSuffix, defaultReconcileForResource)
181+
}
182+
return concurrencyConfig
183+
}

internal/controller/controller.go

Lines changed: 69 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323
"github.com/sap/cap-operator/pkg/client/clientset/versioned"
2424
v1alpha1scheme "github.com/sap/cap-operator/pkg/client/clientset/versioned/scheme"
2525
crdInformers "github.com/sap/cap-operator/pkg/client/informers/externalversions"
26+
"golang.org/x/time/rate"
2627
istio "istio.io/client-go/pkg/clientset/versioned"
2728
istioscheme "istio.io/client-go/pkg/clientset/versioned/scheme"
2829
istioInformers "istio.io/client-go/pkg/informers/externalversions"
@@ -56,17 +57,35 @@ type Controller struct {
5657
eventRecorder events.EventRecorder
5758
}
5859

60+
var (
61+
// Application and Domain resources are less frequently updated, so assume a default concurrency of 1.
62+
DefaultReconcile = 1
63+
DefaultConcurrentReconciles = map[int]int{
64+
ResourceCAPApplicationVersion: 3, // Moderate concurrency to handle multiple versions efficiently
65+
ResourceCAPTenant: 10, // High concurrency to handle multiple tenants efficiently
66+
ResourceCAPTenantOperation: 10, // High concurrency to handle multiple tenant operations efficiently
67+
}
68+
ResourceEnvSuffixMap = map[int]string{
69+
ResourceCAPApplication: "CAP_APPLICATION",
70+
ResourceCAPApplicationVersion: "CAP_APPLICATION_VERSION",
71+
ResourceCAPTenant: "CAP_TENANT",
72+
ResourceCAPTenantOperation: "CAP_TENANT_OPERATION",
73+
ResourceDomain: "DOMAIN",
74+
ResourceClusterDomain: "CLUSTER_DOMAIN",
75+
}
76+
)
77+
5978
func NewController(client kubernetes.Interface, crdClient versioned.Interface, istioClient istio.Interface, gardenerCertificateClient gardenerCert.Interface, certManagerCertificateClient certManager.Interface, gardenerDNSClient gardenerDNS.Interface, promClient promop.Interface) *Controller {
6079
// Register metrics provider on the workqueue
6180
initializeMetrics()
6281

6382
queues := map[int]workqueue.TypedRateLimitingInterface[QueueItem]{
6483
ResourceCAPApplication: workqueue.NewTypedRateLimitingQueueWithConfig(workqueue.DefaultTypedControllerRateLimiter[QueueItem](), workqueue.TypedRateLimitingQueueConfig[QueueItem]{Name: KindMap[ResourceCAPApplication]}),
6584
ResourceCAPApplicationVersion: workqueue.NewTypedRateLimitingQueueWithConfig(workqueue.DefaultTypedControllerRateLimiter[QueueItem](), workqueue.TypedRateLimitingQueueConfig[QueueItem]{Name: KindMap[ResourceCAPApplicationVersion]}),
66-
ResourceCAPTenant: workqueue.NewTypedRateLimitingQueueWithConfig(workqueue.DefaultTypedControllerRateLimiter[QueueItem](), workqueue.TypedRateLimitingQueueConfig[QueueItem]{Name: KindMap[ResourceCAPTenant]}),
67-
ResourceCAPTenantOperation: workqueue.NewTypedRateLimitingQueueWithConfig(workqueue.DefaultTypedControllerRateLimiter[QueueItem](), workqueue.TypedRateLimitingQueueConfig[QueueItem]{Name: KindMap[ResourceCAPTenantOperation]}),
68-
ResourceClusterDomain: workqueue.NewTypedRateLimitingQueueWithConfig(workqueue.DefaultTypedControllerRateLimiter[QueueItem](), workqueue.TypedRateLimitingQueueConfig[QueueItem]{Name: KindMap[ResourceClusterDomain]}),
85+
ResourceCAPTenant: workqueue.NewTypedRateLimitingQueueWithConfig(customRateLimiter(), workqueue.TypedRateLimitingQueueConfig[QueueItem]{Name: KindMap[ResourceCAPTenant]}),
86+
ResourceCAPTenantOperation: workqueue.NewTypedRateLimitingQueueWithConfig(customRateLimiter(), workqueue.TypedRateLimitingQueueConfig[QueueItem]{Name: KindMap[ResourceCAPTenantOperation]}),
6987
ResourceDomain: workqueue.NewTypedRateLimitingQueueWithConfig(workqueue.DefaultTypedControllerRateLimiter[QueueItem](), workqueue.TypedRateLimitingQueueConfig[QueueItem]{Name: KindMap[ResourceDomain]}),
88+
ResourceClusterDomain: workqueue.NewTypedRateLimitingQueueWithConfig(workqueue.DefaultTypedControllerRateLimiter[QueueItem](), workqueue.TypedRateLimitingQueueConfig[QueueItem]{Name: KindMap[ResourceClusterDomain]}),
7089
}
7190

7291
// Use 30mins as the default Resync interval for kube / proprietary resources
@@ -89,8 +108,8 @@ func NewController(client kubernetes.Interface, crdClient versioned.Interface, i
89108
// no activity needed on our side so far
90109
}
91110

92-
// Use 60 as the default Resync interval for our custom resources (CAP CROs)
93-
crdInformerFactory := crdInformers.NewSharedInformerFactory(crdClient, 60*time.Second)
111+
// Use 5 mins as the default Resync interval for our custom resources (CAP CROs)
112+
crdInformerFactory := crdInformers.NewSharedInformerFactory(crdClient, 5*time.Minute)
94113

95114
// initialize event recorder
96115
scheme := runtime.NewScheme()
@@ -122,6 +141,21 @@ func NewController(client kubernetes.Interface, crdClient versioned.Interface, i
122141
return c
123142
}
124143

144+
// Custom Rate limiter for Tenant and TenantOperation queues to allow faster retries and higher throughput.
145+
func customRateLimiter() workqueue.TypedRateLimiter[QueueItem] {
146+
return workqueue.NewTypedMaxOfRateLimiter(
147+
// Faster exponential backoff for transient errors
148+
workqueue.NewTypedItemExponentialFailureRateLimiter[QueueItem](
149+
10*time.Millisecond, // base delay (was 5ms)
150+
300*time.Second, // max delay (was ~1000s)
151+
),
152+
// Higher QPS for bulk processing
153+
&workqueue.TypedBucketRateLimiter[QueueItem]{
154+
Limiter: rate.NewLimiter(rate.Limit(50), 200), // 50 QPS, 200 burst (was 10/100)
155+
},
156+
)
157+
}
158+
125159
func throwInformerStartError(resources map[reflect.Type]bool) {
126160
for resource, ok := range resources {
127161
if !ok {
@@ -179,15 +213,19 @@ func (c *Controller) Start(ctx context.Context) {
179213

180214
var wg sync.WaitGroup
181215
for k := range c.queues {
182-
wg.Add(1)
183-
go func(key int) {
184-
defer wg.Done()
185-
err := c.processQueue(qCxt, key)
186-
if err != nil {
187-
klog.ErrorS(err, "worker queue ended with error", "key", key)
188-
}
189-
qCancel() // cancel context to inform other workers
190-
}(k)
216+
concurrency := getConcurrencyForResource(k)
217+
klog.InfoS("starting worker queue", "resource", getResourceKindFromKey(k), "concurrency", concurrency)
218+
for i := range concurrency {
219+
wg.Add(1)
220+
go func(key, workerId int) {
221+
defer wg.Done()
222+
err := c.processQueue(qCxt, key, workerId)
223+
if err != nil {
224+
klog.ErrorS(err, "worker queue ended with error", "key", key)
225+
}
226+
qCancel() // cancel context to inform other workers
227+
}(k, i)
228+
}
191229
}
192230

193231
// start version cleanup routines
@@ -199,33 +237,41 @@ func (c *Controller) Start(ctx context.Context) {
199237
wg.Wait()
200238
}
201239

202-
func (c *Controller) processQueue(ctx context.Context, key int) error {
203-
klog.InfoS("starting to process queue", "resource", getResourceKindFromKey(key))
240+
func getConcurrencyForResource(key int) int {
241+
concurrency, ok := DefaultConcurrentReconciles[key]
242+
if !ok {
243+
concurrency = DefaultReconcile // default concurrency
244+
}
245+
return concurrency
246+
}
247+
248+
func (c *Controller) processQueue(ctx context.Context, key, workerId int) error {
249+
klog.InfoS("starting to process queue", "resource", getResourceKindFromKey(key), "workerId", workerId)
204250
for {
205251
select {
206252
case <-ctx.Done():
207-
klog.InfoS("context done; ending processing of queue", "resource", getResourceKindFromKey(key))
253+
klog.InfoS("context done; ending processing of queue", "resource", getResourceKindFromKey(key), "workerId", workerId)
208254
return nil
209255
default: // fall through - to avoid blocking
210-
err := c.processQueueItem(ctx, key)
256+
err := c.processQueueItem(ctx, key, workerId)
211257
if err != nil {
212258
return err
213259
}
214260
}
215261
}
216262
}
217263

218-
func (c *Controller) processQueueItem(ctx context.Context, key int) error {
264+
func (c *Controller) processQueueItem(ctx context.Context, key, workerId int) error {
219265
q, ok := c.queues[key]
220266
if !ok {
221267
return fmt.Errorf("unknown queue; ending worker %d", key)
222268
}
223269

224-
klog.V(2).InfoS("Processing queue item in work queue", "resource", getResourceKindFromKey(key), "queue length", q.Len())
270+
klog.V(2).InfoS("Processing queue item in work queue", "resource", getResourceKindFromKey(key), "queue length", q.Len(), "workerId", workerId)
225271

226272
item, shutdown := q.Get()
227273
if shutdown {
228-
return fmt.Errorf("queue (%d) shutdown", key) // stop processing when the queue has been shutdown
274+
return fmt.Errorf("queue (%d, %d) shutdown", key, workerId) // stop processing when the queue has been shutdown
229275
}
230276

231277
// [IMPORTANT] always mark the item as done (after processing it)
@@ -242,7 +288,7 @@ func (c *Controller) processQueueItem(ctx context.Context, key int) error {
242288
// Attempt to recover panics during reconciliation.
243289
defer c.recoverFromPanic(ctx, item, q)
244290

245-
klog.InfoS("Processing Resource", "namespace", item.ResourceKey.Namespace, "name", item.ResourceKey.Name, "kind", getResourceKindFromKey(key), "attempt", attempts)
291+
klog.InfoS("Processing Resource", "namespace", item.ResourceKey.Namespace, "name", item.ResourceKey.Name, "kind", getResourceKindFromKey(key), "attempt", attempts, "workerId", workerId)
246292

247293
switch item.Key {
248294
case ResourceCAPApplication:
@@ -263,7 +309,7 @@ func (c *Controller) processQueueItem(ctx context.Context, key int) error {
263309
}
264310
// Handle reconcile errors
265311
if err != nil {
266-
klog.ErrorS(err, "queue processing error", "resource", getResourceKindFromKey(key))
312+
klog.ErrorS(err, "queue processing error", "resource", getResourceKindFromKey(key), "workerId", workerId)
267313
ReconcileErrors.WithLabelValues(getResourceKindFromKey(item.Key), item.ResourceKey.Namespace, item.ResourceKey.Name).Inc()
268314
if !skipItem {
269315
// add back to queue for re-processing

internal/controller/controller_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -108,9 +108,9 @@ func TestController_processQueue(t *testing.T) {
108108
// Manual API checks
109109
var expectedRes error
110110
if tt.earlyShutDown {
111-
expectedRes = testC.processQueue(ctx, tt.resource)
111+
expectedRes = testC.processQueue(ctx, tt.resource, 0)
112112
} else {
113-
expectedRes = testC.processQueue(context.TODO(), tt.resource)
113+
expectedRes = testC.processQueue(context.TODO(), tt.resource, 0)
114114
}
115115

116116
if !tt.expectError && expectedRes != nil {
@@ -263,12 +263,12 @@ func TestController_processQueueItem(t *testing.T) {
263263
if tt.earlyShutDown {
264264
q.ShutDown()
265265
cancel()
266-
expectedRes = testC.processQueueItem(ctx, tt.resource)
266+
expectedRes = testC.processQueueItem(ctx, tt.resource, 0)
267267
} else {
268268
if tt.resource < 4 || tt.resource == 9 || tt.resource == 99 {
269269
q.Add(item)
270270
}
271-
expectedRes = testC.processQueueItem(context.TODO(), tt.resource)
271+
expectedRes = testC.processQueueItem(context.TODO(), tt.resource, 0)
272272
}
273273

274274
if !tt.expectError && expectedRes != nil {

0 commit comments

Comments
 (0)