@@ -20,9 +20,9 @@ import (
2020)
2121
2222type (
23- NextWorkflow func (string )
24- GetSyncLimit func (context.Context , string ) (int , error )
25- IsWorkflowDeleted func (string ) bool
23+ NextWorkflow func (string )
24+ GetSyncLimit func (context.Context , string ) (int , error )
25+ WorkflowExists func (string ) bool
2626)
2727
2828type Manager struct {
@@ -31,7 +31,7 @@ type Manager struct {
3131 nextWorkflow NextWorkflow
3232 getSyncLimit GetSyncLimit
3333 syncLimitCacheTTL time.Duration
34- isWFDeleted IsWorkflowDeleted
34+ workflowExists WorkflowExists
3535 dbInfo syncdb.DBInfo
3636 queries syncdb.SyncQueries
3737 log logging.Logger
@@ -44,11 +44,11 @@ const (
4444 lockTypeMutex lockTypeName = "mutex"
4545)
4646
47- func NewLockManager (ctx context.Context , kubectlConfig kubernetes.Interface , namespace string , config * config.SyncConfig , getSyncLimit GetSyncLimit , nextWorkflow NextWorkflow , isWFDeleted IsWorkflowDeleted ) * Manager {
48- return createLockManager (ctx , syncdb .DBSessionFromConfig (ctx , kubectlConfig , namespace , config ), config , getSyncLimit , nextWorkflow , isWFDeleted )
47+ func NewLockManager (ctx context.Context , kubectlConfig kubernetes.Interface , namespace string , config * config.SyncConfig , getSyncLimit GetSyncLimit , nextWorkflow NextWorkflow , workflowExists WorkflowExists ) * Manager {
48+ return createLockManager (ctx , syncdb .DBSessionFromConfig (ctx , kubectlConfig , namespace , config ), config , getSyncLimit , nextWorkflow , workflowExists )
4949}
5050
51- func createLockManager (ctx context.Context , dbSession db.Session , config * config.SyncConfig , getSyncLimit GetSyncLimit , nextWorkflow NextWorkflow , isWFDeleted IsWorkflowDeleted ) * Manager {
51+ func createLockManager (ctx context.Context , dbSession db.Session , config * config.SyncConfig , getSyncLimit GetSyncLimit , nextWorkflow NextWorkflow , workflowExists WorkflowExists ) * Manager {
5252 syncLimitCacheTTL := time .Duration (0 )
5353 if config != nil && config .SemaphoreLimitCacheSeconds != nil {
5454 syncLimitCacheTTL = time .Duration (* config .SemaphoreLimitCacheSeconds ) * time .Second
@@ -66,7 +66,7 @@ func createLockManager(ctx context.Context, dbSession db.Session, config *config
6666 nextWorkflow : nextWorkflow ,
6767 getSyncLimit : getSyncLimit ,
6868 syncLimitCacheTTL : syncLimitCacheTTL ,
69- isWFDeleted : isWFDeleted ,
69+ workflowExists : workflowExists ,
7070 dbInfo : dbInfo ,
7171 queries : syncdb .NewSyncQueries (dbSession , dbInfo .Config ),
7272 log : log ,
@@ -116,7 +116,7 @@ func (sm *Manager) CheckWorkflowExistence(ctx context.Context) {
116116 if err != nil {
117117 continue
118118 }
119- if ! sm .isWFDeleted (wfKey ) {
119+ if ! sm .workflowExists (wfKey ) {
120120 lock .release (ctx , holderKeys )
121121 if err := lock .removeFromQueue (ctx , holderKeys ); err != nil {
122122 sm .log .WithField ("holderKeys" , holderKeys ).WithError (err ).Warn (ctx , "failed to remove from queue" )
0 commit comments