@@ -5,25 +5,21 @@ import (
55 "fmt"
66 "io/ioutil"
77 "os"
8+ "reflect"
89 "regexp"
910 "strings"
1011 "sync"
1112 "time"
1213
13- "golang.org/x/time/rate"
14-
15- // objectstorage
1614 v1alpha1 "sigs.k8s.io/container-object-storage-interface-api/apis/objectstorage.k8s.io/v1alpha1"
1715 bucketclientset "sigs.k8s.io/container-object-storage-interface-api/clientset"
1816
19- // k8s api
2017 v1 "k8s.io/api/core/v1"
18+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2119 "k8s.io/apimachinery/pkg/fields"
2220 "k8s.io/apimachinery/pkg/runtime"
21+ "k8s.io/apimachinery/pkg/types"
2322 utilruntime "k8s.io/apimachinery/pkg/util/runtime"
24- "k8s.io/apimachinery/pkg/util/wait"
25-
26- // k8s client
2723 kubeclientset "k8s.io/client-go/kubernetes"
2824 "k8s.io/client-go/kubernetes/scheme"
2925 corev1 "k8s.io/client-go/kubernetes/typed/core/v1"
@@ -35,10 +31,7 @@ import (
3531 "k8s.io/client-go/tools/record"
3632 "k8s.io/client-go/util/workqueue"
3733
38- // logging
3934 "github.com/golang/glog"
40-
41- // config
4235 "github.com/spf13/viper"
4336)
4437
@@ -49,6 +42,7 @@ type deleteFunc func(ctx context.Context, obj interface{}) error
4942type addOp struct {
5043 Object interface {}
5144 AddFunc * addFunc
45+ Indexer cache.Indexer
5246
5347 Key string
5448}
@@ -61,6 +55,7 @@ type updateOp struct {
6155 OldObject interface {}
6256 NewObject interface {}
6357 UpdateFunc * updateFunc
58+ Indexer cache.Indexer
6459
6560 Key string
6661}
@@ -72,6 +67,7 @@ func (u updateOp) String() string {
7267type deleteOp struct {
7368 Object interface {}
7469 DeleteFunc * deleteFunc
70+ Indexer cache.Indexer
7571
7672 Key string
7773}
@@ -105,15 +101,13 @@ type ObjectStorageController struct {
105101 bucketClient bucketclientset.Interface
106102 kubeClient kubeclientset.Interface
107103
108- locker map [string ]* sync.Mutex
109104 lockerLock sync.Mutex
105+ locker map [types.UID ]* sync.Mutex
106+ opMap map [types.UID ]interface {}
110107}
111108
112109func NewDefaultObjectStorageController (identity string , leaderLockName string , threads int ) (* ObjectStorageController , error ) {
113- rateLimit := workqueue .NewMaxOfRateLimiter (
114- workqueue .NewItemExponentialFailureRateLimiter (100 * time .Millisecond , 600 * time .Second ),
115- & workqueue.BucketRateLimiter {Limiter : rate .NewLimiter (rate .Limit (10 ), 100 )},
116- )
110+ rateLimit := workqueue .NewItemExponentialFailureRateLimiter (100 * time .Millisecond , 30 * time .Second )
117111 return NewObjectStorageController (identity , leaderLockName , threads , rateLimit )
118112}
119113
@@ -162,10 +156,13 @@ func NewObjectStorageControllerWithClientset(identity string, leaderLockName str
162156 queue : workqueue .NewRateLimitingQueue (limiter ),
163157 threadiness : threads ,
164158
165- ResyncPeriod : 30 * time .Second ,
166- LeaseDuration : 15 * time .Second ,
167- RenewDeadline : 10 * time .Second ,
159+ ResyncPeriod : 30 * time .Second ,
160+ // leader election
161+ LeaseDuration : 60 * time .Second ,
162+ RenewDeadline : 15 * time .Second ,
168163 RetryPeriod : 5 * time .Second ,
164+
165+ opMap : map [types.UID ]interface {}{},
169166 }, nil
170167}
171168
@@ -219,10 +216,11 @@ func (c *ObjectStorageController) Run(ctx context.Context) error {
219216 }
220217
221218 leaderConfig := leaderelection.LeaderElectionConfig {
222- Lock : l ,
223- LeaseDuration : c .LeaseDuration ,
224- RenewDeadline : c .RenewDeadline ,
225- RetryPeriod : c .RetryPeriod ,
219+ Lock : l ,
220+ ReleaseOnCancel : true ,
221+ LeaseDuration : c .LeaseDuration ,
222+ RenewDeadline : c .RenewDeadline ,
223+ RetryPeriod : c .RetryPeriod ,
226224 Callbacks : leaderelection.LeaderCallbacks {
227225 OnStartedLeading : func (ctx context.Context ) {
228226 glog .V (2 ).Info ("became leader, starting" )
@@ -248,75 +246,85 @@ func (c *ObjectStorageController) runWorker(ctx context.Context) {
248246
249247func (c * ObjectStorageController ) processNextItem (ctx context.Context ) bool {
250248 // Wait until there is a new item in the working queue
251- op , quit := c .queue .Get ()
249+ uuidInterface , quit := c .queue .Get ()
252250 if quit {
253251 return false
254252 }
255253
254+ uuid := uuidInterface .(types.UID )
255+ var err error
256256 // With the lock below in place, we can safely tell the queue that we are done
257257 // processing this item. The lock will ensure that multiple items of the same
258258 // name and kind do not get processed simultaneously
259- defer c .queue .Done (op )
259+ defer c .queue .Done (uuid )
260260
261261 // Ensure that multiple operations on different versions of the same object
262262 // do not happen in parallel
263- c .OpLock (op )
264- defer c .OpUnlock (op )
263+ c .OpLock (uuid )
264+ defer c .OpUnlock (uuid )
265+
266+ op := c .opMap [uuid ]
265267
266- var err error
267268 switch o := op .(type ) {
268269 case addOp :
269270 add := * o .AddFunc
271+ objMeta := o .Object .(metav1.Object )
272+ name := objMeta .GetName ()
273+ ns := objMeta .GetNamespace ()
270274 err = add (ctx , o .Object )
275+ if err == nil {
276+ o .Indexer .Add (o .Object )
277+ } else {
278+ glog .Errorf ("Error adding %s %s: %v" , ns , name , err )
279+ }
271280 case updateOp :
272281 update := * o .UpdateFunc
282+ objMeta := o .OldObject .(metav1.Object )
283+ name := objMeta .GetName ()
284+ ns := objMeta .GetNamespace ()
273285 err = update (ctx , o .OldObject , o .NewObject )
286+ if err == nil {
287+ o .Indexer .Update (o .NewObject )
288+ } else {
289+ glog .Errorf ("Error updating %s %s: %v" , ns , name , err )
290+ }
274291 case deleteOp :
275292 delete := * o .DeleteFunc
293+ objMeta := o .Object .(metav1.Object )
294+ name := objMeta .GetName ()
295+ ns := objMeta .GetNamespace ()
276296 err = delete (ctx , o .Object )
297+ if err == nil {
298+ o .Indexer .Delete (o .Object )
299+ } else {
300+ glog .Errorf ("Error deleting %s %s: %v" , ns , name , err )
301+ }
277302 default :
278303 panic ("unknown item in queue" )
279304 }
280305
281306 // Handle the error if something went wrong
282- c .handleErr (err , op )
307+ c .handleErr (err , uuid )
283308 return true
284309}
285310
286- func (c * ObjectStorageController ) OpLock (op interface {} ) {
311+ func (c * ObjectStorageController ) OpLock (op types. UID ) {
287312 c .GetOpLock (op ).Lock ()
288313}
289314
290- func (c * ObjectStorageController ) OpUnlock (op interface {} ) {
315+ func (c * ObjectStorageController ) OpUnlock (op types. UID ) {
291316 c .GetOpLock (op ).Unlock ()
292317}
293318
294- func (c * ObjectStorageController ) GetOpLock (op interface {}) * sync.Mutex {
295- var key string
296- var ext string
297-
298- switch o := op .(type ) {
299- case addOp :
300- key = o .Key
301- ext = fmt .Sprintf ("%v" , o .AddFunc )
302- case updateOp :
303- key = o .Key
304- ext = fmt .Sprintf ("%v" , o .UpdateFunc )
305- case deleteOp :
306- key = o .Key
307- ext = fmt .Sprintf ("%v" , o .DeleteFunc )
308- default :
309- panic ("unknown item in queue" )
310- }
319+ func (c * ObjectStorageController ) GetOpLock (op types.UID ) * sync.Mutex {
320+ lockKey := op
321+ c .lockerLock .Lock ()
322+ defer c .lockerLock .Unlock ()
311323
312- lockKey := fmt .Sprintf ("%s/%s" , key , ext )
313324 if c .locker == nil {
314- c .locker = map [string ]* sync.Mutex {}
325+ c .locker = map [types. UID ]* sync.Mutex {}
315326 }
316327
317- c .lockerLock .Lock ()
318- defer c .lockerLock .Unlock ()
319-
320328 if _ , ok := c .locker [lockKey ]; ! ok {
321329 c .locker [lockKey ] = & sync.Mutex {}
322330 }
@@ -326,36 +334,15 @@ func (c *ObjectStorageController) GetOpLock(op interface{}) *sync.Mutex {
326334// handleErr checks if an error happened and makes sure we will retry later.
327335func (c * ObjectStorageController ) handleErr (err error , op interface {}) {
328336 if err == nil {
329- // Forget about the #AddRateLimited history of the op on every successful synchronization.
330- // This ensures that future processing of updates for this op is not delayed because of
331- // an outdated error history.
332337 c .queue .Forget (op )
333338 return
334339 }
335-
336- /* TODO: Determine if there is a maxium number of retries or time allowed before giving up
337- // This controller retries 5 times if something goes wrong. After that, it stops trying.
338- if c.queue.NumRequeues(op) < 5 {
339- klog.Infof("Error syncing op %v: %v", key, err)
340-
341- // Re-enqueue the key rate limited. Based on the rate limiter on the
342- // queue and the re-enqueue history, the op will be processed later again.
343- c.queue.AddRateLimited(op)
344- return
345- }
346-
347- c.queue.Forget(key)
348- // Report to an external entity that, even after several retries, we could not successfully process this op
349- utilruntime.HandleError(err)
350- klog.Infof("Dropping op %+v out of the queue: %v", op, err)
351- */
352- glog .V (5 ).Infof ("Error executing operation %+v: %+v" , op , err )
353340 c .queue .AddRateLimited (op )
354341}
355342
356343func (c * ObjectStorageController ) runController (ctx context.Context ) {
357344 controllerFor := func (name string , objType runtime.Object , add addFunc , update updateFunc , delete deleteFunc ) {
358- indexer := cache .NewIndexer (cache .MetaNamespaceKeyFunc , cache.Indexers {})
345+ indexer := cache .NewIndexer (cache .DeletionHandlingMetaNamespaceKeyFunc , cache.Indexers {})
359346 resyncPeriod := c .ResyncPeriod
360347
361348 lw := cache .NewListWatchFromClient (c .bucketClient .ObjectstorageV1alpha1 ().RESTClient (), name , "" , fields .Everything ())
@@ -378,38 +365,61 @@ func (c *ObjectStorageController) runController(ctx context.Context) {
378365 panic (err )
379366 }
380367
381- c .queue .Add (updateOp {
368+ if reflect .DeepEqual (d .Object , old ) {
369+ return nil
370+ }
371+
372+ uuid := d .Object .(metav1.Object ).GetUID ()
373+ c .OpLock (uuid )
374+ defer c .OpUnlock (uuid )
375+ c .opMap [uuid ] = updateOp {
382376 OldObject : old ,
383377 NewObject : d .Object ,
384378 UpdateFunc : & update ,
385379 Key : key ,
386- })
387- return indexer .Update (d .Object )
380+ Indexer : indexer ,
381+ }
382+ c .queue .Add (uuid )
388383 } else {
389384 key , err := cache .MetaNamespaceKeyFunc (d .Object )
390385 if err != nil {
391386 panic (err )
392387 }
393388
394- c .queue .Add (addOp {
389+ uuid := d .Object .(metav1.Object ).GetUID ()
390+ c .OpLock (uuid )
391+ defer c .OpUnlock (uuid )
392+
393+ // If an update to the k8s object happens before add has succeeded
394+ if op , ok := c .opMap [uuid ]; ok {
395+ if _ , ok := op .(updateOp ); ok {
396+ return fmt .Errorf ("cannot add already added object: %s" , key )
397+ }
398+ }
399+ c .opMap [uuid ] = addOp {
395400 Object : d .Object ,
396401 AddFunc : & add ,
397402 Key : key ,
398- })
399- return indexer .Add (d .Object )
403+ Indexer : indexer ,
404+ }
405+ c .queue .Add (uuid )
400406 }
401407 case cache .Deleted :
402408 key , err := cache .DeletionHandlingMetaNamespaceKeyFunc (d .Object )
403409 if err != nil {
404410 panic (err )
405411 }
406412
407- c .queue .Add (deleteOp {
413+ uuid := d .Object .(metav1.Object ).GetUID ()
414+ c .OpLock (uuid )
415+ defer c .OpUnlock (uuid )
416+ c .opMap [uuid ] = deleteOp {
408417 Object : d .Object ,
409418 DeleteFunc : & delete ,
410419 Key : key ,
411- })
412- return indexer .Delete (d .Object )
420+ Indexer : indexer ,
421+ }
422+ c .queue .Add (uuid )
413423 }
414424 }
415425 return nil
@@ -429,7 +439,7 @@ func (c *ObjectStorageController) runController(ctx context.Context) {
429439 }
430440
431441 for i := 0 ; i < c .threadiness ; i ++ {
432- go wait . UntilWithContext ( ctx , c .runWorker , time . Second )
442+ go c .runWorker ( ctx )
433443 }
434444
435445 <- ctx .Done ()
0 commit comments