@@ -103,7 +103,7 @@ type ObjectStorageController struct {
103103
104104 lockerLock sync.Mutex
105105 locker map [types.UID ]* sync.Mutex
106- opMap map [types. UID ] interface {}
106+ opMap * sync. Map
107107}
108108
109109func NewDefaultObjectStorageController (identity string , leaderLockName string , threads int ) (* ObjectStorageController , error ) {
@@ -162,7 +162,7 @@ func NewObjectStorageControllerWithClientset(identity string, leaderLockName str
162162 RenewDeadline : 15 * time .Second ,
163163 RetryPeriod : 5 * time .Second ,
164164
165- opMap : map [types. UID ] interface {} {},
165+ opMap : & sync. Map {},
166166 }, nil
167167}
168168
@@ -253,18 +253,19 @@ func (c *ObjectStorageController) processNextItem(ctx context.Context) bool {
253253
254254 uuid := uuidInterface .(types.UID )
255255 var err error
256- // With the lock below in place, we can safely tell the queue that we are done
257- // processing this item. The lock will ensure that multiple items of the same
258- // name and kind do not get processed simultaneously
256+
259257 defer c .queue .Done (uuid )
260258
259+ op , ok := c .opMap .Load (uuid )
260+ if ! ok {
261+ panic ("unreachable code" )
262+ }
263+
261264 // Ensure that multiple operations on different versions of the same object
262265 // do not happen in parallel
263266 c .OpLock (uuid )
264267 defer c .OpUnlock (uuid )
265268
266- op := c .opMap [uuid ]
267-
268269 switch o := op .(type ) {
269270 case addOp :
270271 add := * o .AddFunc
@@ -332,12 +333,12 @@ func (c *ObjectStorageController) GetOpLock(op types.UID) *sync.Mutex {
332333}
333334
334335// handleErr checks if an error happened and makes sure we will retry later.
335- func (c * ObjectStorageController ) handleErr (err error , op interface {} ) {
336+ func (c * ObjectStorageController ) handleErr (err error , uuid types. UID ) {
336337 if err == nil {
337- c .queue . Forget ( op )
338+ c .opMap . Delete ( uuid )
338339 return
339340 }
340- c .queue .AddRateLimited (op )
341+ c .queue .AddRateLimited (uuid )
341342}
342343
343344func (c * ObjectStorageController ) runController (ctx context.Context ) {
@@ -349,7 +350,7 @@ func (c *ObjectStorageController) runController(ctx context.Context) {
349350 cfg := & cache.Config {
350351 Queue : cache .NewDeltaFIFOWithOptions (cache.DeltaFIFOOptions {
351352 KnownObjects : indexer ,
352- EmitDeltaTypeReplaced : true ,
353+ EmitDeltaTypeReplaced : false ,
353354 }),
354355 ListerWatcher : lw ,
355356 ObjectType : objType ,
@@ -370,15 +371,14 @@ func (c *ObjectStorageController) runController(ctx context.Context) {
370371 }
371372
372373 uuid := d .Object .(metav1.Object ).GetUID ()
373- c .OpLock (uuid )
374- defer c .OpUnlock (uuid )
375- c .opMap [uuid ] = updateOp {
374+
375+ c .opMap .Store (uuid , updateOp {
376376 OldObject : old ,
377377 NewObject : d .Object ,
378378 UpdateFunc : & update ,
379379 Key : key ,
380380 Indexer : indexer ,
381- }
381+ })
382382 c .queue .Add (uuid )
383383 } else {
384384 key , err := cache .MetaNamespaceKeyFunc (d .Object )
@@ -387,20 +387,17 @@ func (c *ObjectStorageController) runController(ctx context.Context) {
387387 }
388388
389389 uuid := d .Object .(metav1.Object ).GetUID ()
390- c .OpLock (uuid )
391- defer c .OpUnlock (uuid )
392-
393- // If an update to the k8s object happens before add has succeeded
394- if op , ok := c .opMap [uuid ]; ok {
395- if _ , ok := op .(updateOp ); ok {
396- return fmt .Errorf ("cannot add already added object: %s" , key )
397- }
398- }
399- c .opMap [uuid ] = addOp {
390+
391+ if op , ok := c .opMap .LoadOrStore (uuid , addOp {
400392 Object : d .Object ,
401393 AddFunc : & add ,
402394 Key : key ,
403395 Indexer : indexer ,
396+ }); ok { // If an update to the k8s object happens before add has succeeded
397+ if _ , ok := op .(updateOp ); ok {
398+ err := fmt .Errorf ("cannot add already added object: %s" , key )
399+ return err
400+ }
404401 }
405402 c .queue .Add (uuid )
406403 }
@@ -411,14 +408,12 @@ func (c *ObjectStorageController) runController(ctx context.Context) {
411408 }
412409
413410 uuid := d .Object .(metav1.Object ).GetUID ()
414- c .OpLock (uuid )
415- defer c .OpUnlock (uuid )
416- c .opMap [uuid ] = deleteOp {
411+ c .opMap .Store (uuid , deleteOp {
417412 Object : d .Object ,
418413 DeleteFunc : & delete ,
419414 Key : key ,
420415 Indexer : indexer ,
421- }
416+ })
422417 c .queue .Add (uuid )
423418 }
424419 }
0 commit comments