Skip to content

Commit 5dd3a78

Browse files
committed
[core] Subworkflow include implementation (includeRole)
This also fixes an unreported issue that would have prevented callRoles from being wrapped in iteratorRoles.
1 parent 81ed506 commit 5dd3a78

10 files changed

Lines changed: 412 additions & 30 deletions

File tree

core/workflow/aggregator.go

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -52,12 +52,14 @@ type _unionTypeProbe struct {
5252
Task *struct{}
5353
Roles []interface{}
5454
Call *struct{}
55+
Include *string
5556
}
5657
type _roleUnion struct{
5758
*iteratorRole
5859
*aggregatorRole
5960
*taskRole
6061
*callRole
62+
*includeRole
6163
}
6264

6365
func (union *_roleUnion) UnmarshalYAML(unmarshal func(interface{}) error) (unionErr error) {
@@ -70,12 +72,14 @@ func (union *_roleUnion) UnmarshalYAML(unmarshal func(interface{}) error) (union
7072
switch {
7173
case _probe.For != nil:
7274
unionErr = unmarshal(&union.iteratorRole)
73-
case _probe.Roles != nil && _probe.Task == nil && _probe.Call == nil:
75+
case _probe.Roles != nil && _probe.Task == nil && _probe.Call == nil && _probe.Include == nil:
7476
unionErr = unmarshal(&union.aggregatorRole)
75-
case _probe.Task != nil && _probe.Roles == nil && _probe.Call == nil:
77+
case _probe.Task != nil && _probe.Roles == nil && _probe.Call == nil && _probe.Include == nil:
7678
unionErr = unmarshal(&union.taskRole)
77-
case _probe.Call != nil && _probe.Task == nil && _probe.Roles == nil:
79+
case _probe.Call != nil && _probe.Task == nil && _probe.Roles == nil && _probe.Include == nil:
7880
unionErr = unmarshal(&union.callRole)
81+
case _probe.Include != nil && _probe.Task == nil && _probe.Roles == nil && _probe.Call == nil:
82+
unionErr = unmarshal(&union.includeRole)
7983
default:
8084
unionErr = errors.New("cannot unmarshal invalid role to union")
8185
}
@@ -106,6 +110,8 @@ func (r *aggregator) UnmarshalYAML(unmarshal func(interface{}) error) (err error
106110
roles[i] = v.taskRole
107111
case v.callRole != nil:
108112
roles[i] = v.callRole
113+
case v.includeRole != nil:
114+
roles[i] = v.includeRole
109115
default:
110116
err = errors.New("invalid child role at index " + strconv.Itoa(i))
111117
return

core/workflow/aggregatorrole.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ func (r *aggregatorRole) GlobFilter(g glob.Glob) (rs []Role) {
9999
return
100100
}
101101

102-
func (r *aggregatorRole) ProcessTemplates(workflowRepo *repos.Repo) (err error) {
102+
func (r *aggregatorRole) ProcessTemplates(workflowRepo *repos.Repo, loadSubworkflow LoadSubworkflowFunc) (err error) {
103103
if r == nil {
104104
return errors.New("role tree error when processing templates")
105105
}
@@ -138,7 +138,7 @@ func (r *aggregatorRole) ProcessTemplates(workflowRepo *repos.Repo) (err error)
138138
// Process templates for child roles
139139
for _, role := range r.Roles {
140140
role.setParent(r)
141-
err = role.ProcessTemplates(workflowRepo)
141+
err = role.ProcessTemplates(workflowRepo, loadSubworkflow)
142142
if err != nil {
143143
return
144144
}

core/workflow/callrole.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,7 @@ func (t *callRole) GlobFilter(g glob.Glob) (rs []Role) {
130130
return
131131
}
132132

133-
func (t *callRole) ProcessTemplates(workflowRepo *repos.Repo) (err error) {
133+
func (t *callRole) ProcessTemplates(_ *repos.Repo, _ LoadSubworkflowFunc) (err error) {
134134
if t == nil {
135135
return errors.New("role tree error when processing templates")
136136
}

core/workflow/calltemplate.go

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
/*
2+
* === This file is part of ALICE O² ===
3+
*
4+
* Copyright 2021 CERN and copyright holders of ALICE O².
5+
* Author: Teo Mrnjavac <teo.mrnjavac@cern.ch>
6+
*
7+
* This program is free software: you can redistribute it and/or modify
8+
* it under the terms of the GNU General Public License as published by
9+
* the Free Software Foundation, either version 3 of the License, or
10+
* (at your option) any later version.
11+
*
12+
* This program is distributed in the hope that it will be useful,
13+
* but WITHOUT ANY WARRANTY; without even the implied warranty of
14+
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15+
* GNU General Public License for more details.
16+
*
17+
* You should have received a copy of the GNU General Public License
18+
* along with this program. If not, see <http://www.gnu.org/licenses/>.
19+
*
20+
* In applying this license CERN does not waive the privileges and
21+
* immunities granted to it by virtue of its status as an
22+
* Intergovernmental Organization or submit itself to any jurisdiction.
23+
*/
24+
25+
package workflow
26+
27+
import (
28+
"text/template"
29+
"errors"
30+
"github.com/jinzhu/copier"
31+
)
32+
33+
type callTemplate struct {
34+
callRole
35+
stringTemplates map[string]template.Template `yaml:"-,omitempty"`
36+
}
37+
38+
func (tt *callTemplate) copy() copyable {
39+
rCopy := callTemplate{
40+
callRole: *tt.callRole.copy().(*callRole),
41+
}
42+
_ = copier.Copy(&rCopy.stringTemplates, &tt.stringTemplates)
43+
return &rCopy
44+
}
45+
46+
func (tt *callTemplate) UnmarshalYAML(unmarshal func(interface{}) error) (err error) {
47+
type _callTemplate callTemplate
48+
aux := _callTemplate{}
49+
err = unmarshal(&aux)
50+
if err != nil {
51+
return
52+
}
53+
aux.stringTemplates = make(map[string]template.Template)
54+
55+
/* template cache builder
56+
tmpl := template.New(aux.GetPath())
57+
58+
// Fields to parse as templates:
59+
for _, str := range []string{
60+
aux.LoadTaskClass,
61+
aux.Name,
62+
} {
63+
var tempTmpl *template.Template
64+
tempTmpl, err = tmpl.Parse(str)
65+
if err != nil {
66+
return
67+
}
68+
aux.stringTemplates[str] = *tempTmpl
69+
}
70+
*/
71+
72+
*tt = callTemplate(aux)
73+
return
74+
}
75+
76+
func (tt *callTemplate) generateRole(localVars map[string]string) (c Role, err error) {
77+
if tt == nil {
78+
return nil, errors.New("cannot generate from nil sender")
79+
}
80+
81+
// See note for aggregatorTemplate.generateRole
82+
tr := *tt.callRole.copy().(*callRole)
83+
for k, v := range localVars {
84+
tr.Locals[k] = v
85+
}
86+
87+
c = &tr
88+
return
89+
}

core/workflow/includerole.go

Lines changed: 183 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,183 @@
1+
/*
2+
* === This file is part of ALICE O² ===
3+
*
4+
* Copyright 2021 CERN and copyright holders of ALICE O².
5+
* Author: Teo Mrnjavac <teo.mrnjavac@cern.ch>
6+
*
7+
* This program is free software: you can redistribute it and/or modify
8+
* it under the terms of the GNU General Public License as published by
9+
* the Free Software Foundation, either version 3 of the License, or
10+
* (at your option) any later version.
11+
*
12+
* This program is distributed in the hope that it will be useful,
13+
* but WITHOUT ANY WARRANTY; without even the implied warranty of
14+
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15+
* GNU General Public License for more details.
16+
*
17+
* You should have received a copy of the GNU General Public License
18+
* along with this program. If not, see <http://www.gnu.org/licenses/>.
19+
*
20+
* In applying this license CERN does not waive the privileges and
21+
* immunities granted to it by virtue of its status as an
22+
* Intergovernmental Organization or submit itself to any jurisdiction.
23+
*/
24+
25+
package workflow
26+
27+
import (
28+
"errors"
29+
"strings"
30+
texttemplate "text/template"
31+
32+
"github.com/AliceO2Group/Control/configuration/template"
33+
"github.com/AliceO2Group/Control/core/repos"
34+
"github.com/AliceO2Group/Control/core/task"
35+
"github.com/AliceO2Group/Control/core/the"
36+
)
37+
38+
// An includeRole is a delayed aggregatorRole
39+
// It takes an `include` entry, which is then expanded into a full aggregatorRole
40+
// during the ProcessTemplates function.
41+
type includeRole struct {
42+
aggregatorRole
43+
44+
Include string `yaml:"include,omitempty"`
45+
}
46+
47+
func (r *includeRole) UnmarshalYAML(unmarshal func(interface{}) error) (err error) {
48+
// NOTE: see NOTE in roleBase.UnmarshalYAML
49+
50+
innerRoleBase := roleBase{}
51+
err = unmarshal(&innerRoleBase)
52+
if err != nil {
53+
return
54+
}
55+
56+
type auxInclude struct{
57+
Include string `yaml:"include,omitempty"`
58+
}
59+
_auxInclude := auxInclude{}
60+
err = unmarshal(&_auxInclude)
61+
if err != nil {
62+
return
63+
}
64+
65+
role := includeRole{
66+
aggregatorRole: aggregatorRole{
67+
roleBase: innerRoleBase,
68+
aggregator: aggregator{},
69+
},
70+
Include: _auxInclude.Include,
71+
}
72+
73+
74+
*r = role
75+
return
76+
}
77+
78+
func (r *includeRole) MarshalYAML() (interface{}, error) {
79+
return nil, nil
80+
}
81+
82+
func (r *includeRole) ProcessTemplates(workflowRepo *repos.Repo, loadSubworkflow LoadSubworkflowFunc) (err error) {
83+
if r == nil {
84+
return errors.New("role tree error when processing templates")
85+
}
86+
87+
templSequence := template.Sequence{
88+
template.STAGE0: template.WrapMapItems(r.Defaults.Raw()),
89+
template.STAGE1: template.WrapMapItems(r.Vars.Raw()),
90+
template.STAGE2: template.WrapMapItems(r.UserVars.Raw()),
91+
template.STAGE3: template.Fields{
92+
template.WrapPointer(&r.Name),
93+
template.WrapPointer(&r.Include),
94+
},
95+
template.STAGE4: append(append(
96+
WrapConstraints(r.Constraints),
97+
r.wrapConnectFields()...),
98+
template.WrapPointer(&r.Enabled)),
99+
}
100+
101+
// TODO: push cached templates here
102+
err = templSequence.Execute(the.ConfSvc(), r.GetPath(), template.VarStack{
103+
Locals: r.Locals,
104+
Defaults: r.Defaults,
105+
Vars: r.Vars,
106+
UserVars: r.UserVars,
107+
}, r.makeBuildObjectStackFunc(), make(map[string]texttemplate.Template))
108+
if err != nil {
109+
return
110+
}
111+
112+
// After template processing we write the Locals to Vars in order to make them available to children
113+
for k, v := range r.Locals {
114+
r.Vars.Set(k, v)
115+
}
116+
117+
r.Enabled = strings.TrimSpace(r.Enabled)
118+
119+
// Common part done, include resolution starts here.
120+
// An includeRole is essentially a baseRole + `include:` expression. We first need to resolve
121+
// the expression to obtain a full subworkflow template identifier, i.e. a full repo/wft/branch
122+
// combo.
123+
// Once that's done we can load the subworkflow and obtain the root `aggregatorRole` plus a new
124+
// repos.Repo definition. If a repo or branch is already provided in the subworkflow expression
125+
// then the returned newWfRepo will reflect this, and any additionally nested includes will
126+
// default to the repo of their direct parent.
127+
var subWfRoot *aggregatorRole
128+
var newWfRepo *repos.Repo
129+
include := workflowRepo.ResolveSubworkflowTemplateIdentifier(r.Include)
130+
subWfRoot, newWfRepo, err = loadSubworkflow(include, r)
131+
if err != nil {
132+
return err
133+
}
134+
135+
// By now the subworkflow is loaded and reparented to this includeRole. This reparenting is
136+
// needed to ensure the correct gera.StringMap hierarchies, but now that we replace the
137+
// composed aggregatorRole with the newly loaded one, we must also fix the reparenting and
138+
// ensure the loaded name doesn't overwrite the original name of the includeRole.
139+
parent := r.parent
140+
name := r.Name
141+
r.aggregatorRole = *subWfRoot // The previously composed aggregatorRole+roleBase are overwritten here
142+
r.parent = parent
143+
r.Name = name
144+
145+
// Process templates for child roles
146+
for _, role := range r.Roles {
147+
role.setParent(r)
148+
err = role.ProcessTemplates(newWfRepo, loadSubworkflow)
149+
if err != nil {
150+
return
151+
}
152+
}
153+
154+
// If any child is not Enabled after template resolution,
155+
// we filter it out of existence
156+
enabledRoles := make([]Role, 0)
157+
for _, role := range r.Roles {
158+
if role.IsEnabled() {
159+
enabledRoles = append(enabledRoles, role)
160+
}
161+
}
162+
r.Roles = enabledRoles
163+
164+
return
165+
}
166+
167+
func (r* includeRole) UpdateStatus(s task.Status) {
168+
r.updateStatus(s)
169+
}
170+
171+
func (r* includeRole) UpdateState(s task.State) {
172+
r.updateState(s)
173+
}
174+
175+
func (r *includeRole) copy() copyable {
176+
rCopy := includeRole{
177+
aggregatorRole: *r.aggregatorRole.copy().(*aggregatorRole),
178+
Include: r.Include,
179+
}
180+
rCopy.status = SafeStatus{status:rCopy.GetStatus()}
181+
rCopy.state = SafeState{state:rCopy.GetState()}
182+
return &rCopy
183+
}

0 commit comments

Comments
 (0)