Skip to content

Commit 76ea5f7

Browse files
committed
[core] Fix integration plugin loading
1 parent 97a72c2 commit 76ea5f7

5 files changed

Lines changed: 23 additions & 9 deletions

File tree

core/config.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -93,8 +93,9 @@ func setDefaults() error {
9393
viper.SetDefault("verbose", false)
9494
viper.SetDefault("veryVerbose", false)
9595
viper.SetDefault("dumpWorkflows", false)
96-
viper.SetDefault("configServiceUri", "apricot://localhost:47101")
97-
viper.SetDefault("dcsServiceEndpoint", "localhost:50051")
96+
viper.SetDefault("configServiceUri", "apricot://127.0.0.1:47101")
97+
viper.SetDefault("dcsServiceEndpoint", "127.0.0.1:50051")
98+
viper.SetDefault("ddSchedulerEndpoint", "127.0.0.1:50052")
9899
viper.SetDefault("integrationPlugins", []string{})
99100
viper.SetDefault("coreConfigEntry", "settings")
100101
viper.SetDefault("fmqPlugin", "OCClite")
@@ -139,6 +140,7 @@ func setFlags() error {
139140
pflag.Bool("dumpWorkflows", viper.GetBool("dumpWorkflows"), "Dump unprocessed and processed workflow files (`$PWD/wf-{,un}processed-<timestamp>.json`)")
140141
pflag.String("configServiceUri", viper.GetString("configServiceUri"), "URI of the Apricot instance (`apricot://host:port`), Consul server (`consul://`) or YAML configuration file, entry point for all configuration")
141142
pflag.String("dcsServiceEndpoint", viper.GetString("dcsServiceEndpoint"), "Endpoint of the DCS gRPC service (`host:port`)")
143+
pflag.String("ddSchedulerEndpoint", viper.GetString("ddSchedulerEndpoint"), "Endpoint of the DD scheduler gRPC service (`host:port`)")
142144
pflag.StringSlice("integrationPlugins", viper.GetStringSlice("integrationPlugins"), "List of integration plugins to load (default: empty)")
143145
pflag.String("coreConfigEntry", viper.GetString("coreConfigEntry"), "key for AliECS core configuration within the `aliecs` component [EXPERT SETTING]")
144146
pflag.String("fmqPlugin", viper.GetString("fmqPlugin"), "Name of the plugin for FairMQ tasks")

core/integration/ddsched/plugin.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ import (
4242
"google.golang.org/grpc/connectivity"
4343
)
4444

45-
const DCS_CALL_TIMEOUT = 10 * time.Second
45+
const DDSCHED_CALL_TIMEOUT = 10 * time.Second
4646

4747

4848
type Plugin struct {
@@ -81,7 +81,7 @@ func (p *Plugin) GetName() string {
8181

8282
func (p *Plugin) Init(_ string) error {
8383
if p.ddSchedClient == nil {
84-
callTimeout := DCS_CALL_TIMEOUT
84+
callTimeout := DDSCHED_CALL_TIMEOUT
8585
cxt, cancel := context.WithTimeout(context.Background(), callTimeout)
8686
p.ddSchedClient = NewClient(cxt, cancel, viper.GetString("ddSchedulerEndpoint"))
8787
if p.ddSchedClient == nil {

core/integration/plugin.go

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -34,10 +34,12 @@ import (
3434

3535
var log = logger.New(logrus.StandardLogger(), "integration")
3636

37-
var(
38-
once sync.Once
37+
var (
38+
once sync.Once
39+
instance Plugins
40+
41+
loaderOnce sync.Once
3942
pluginLoaders map[string]func() Plugin
40-
instance Plugins
4143
)
4244

4345
type Plugins []Plugin
@@ -52,7 +54,7 @@ type Plugin interface {
5254
type NewFunc func(endpoint string) Plugin
5355

5456
func RegisterPlugin(pluginName string, endpointArgumentName string, newFunc NewFunc) {
55-
once.Do(func() {
57+
loaderOnce.Do(func() {
5658
pluginLoaders = make(map[string]func() Plugin)
5759
})
5860
pluginLoaders[pluginName] = func() Plugin {

core/workflow/callable/call.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,7 @@ func (c *Call) Call() error {
104104
template.WrapPointer(&returnVar),
105105
}
106106
c.VarStack["run_number"] = strconv.FormatUint(uint64(c.parentRole.GetCurrentRunNumber()), 10 )
107+
c.VarStack["environment_id"] = c.parentRole.GetEnvironmentId().String()
107108
objStack := integration.PluginsInstance().ObjectStack(c)
108109

109110
err := fields.Execute(apricot.Instance(), c.GetName(), c.VarStack, objStack, make(map[string]texttemplate.Template))

core/workflow/roleutils.go

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,11 @@ func Walk(root Role, do func(role Role)) {
6969
for _, child := range typed.Roles {
7070
Walk(child, do)
7171
}
72+
case *iteratorRole:
73+
do(typed)
74+
for _, child := range typed.Roles {
75+
LeafWalk(child, do)
76+
}
7277
case *taskRole:
7378
do(typed)
7479
case *callRole:
@@ -80,7 +85,11 @@ func LeafWalk(root Role, do func(role Role)) {
8085
switch typed := root.(type) {
8186
case *aggregatorRole:
8287
for _, child := range typed.Roles {
83-
Walk(child, do)
88+
LeafWalk(child, do)
89+
}
90+
case *iteratorRole:
91+
for _, child := range typed.Roles {
92+
LeafWalk(child, do)
8493
}
8594
case *taskRole:
8695
do(typed)

0 commit comments

Comments
 (0)