Skip to content

Commit 1b695a2

Browse files
committed
[core] DD scheduler integration plugin
1 parent 20cad8e commit 1b695a2

15 files changed

Lines changed: 1219 additions & 21 deletions

File tree

Makefile

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ BUILD_FLAGS=$(CGO_LDFLAGS) $(BUILD_ENV_FLAGS)
3838
endif
3939
REPOPATH = github.com/AliceO2Group/Control
4040
ODC_PROTO="https://raw.githubusercontent.com/FairRootGroup/ODC/master/grpc-proto/odc.proto"
41+
DD_PROTO="https://raw.githubusercontent.com/AliceO2Group/DataDistribution/master/src/DataDistControl/DataDistControl.proto"
4142

4243
VERBOSE_1 := -v
4344
VERBOSE_2 := -v -x
@@ -54,7 +55,7 @@ WHAT_o2-apricot_BUILD_FLAGS=$(BUILD_ENV_FLAGS)
5455
INSTALL_WHAT:=$(patsubst %, install_%, $(WHAT))
5556

5657

57-
GENERATE_DIRS := ./apricot ./coconut/cmd ./core ./executor ./core/integration/dcs ./odcshim ./walnut
58+
GENERATE_DIRS := ./apricot ./coconut/cmd ./core ./core/integration/dcs ./core/integration/ddsched ./executor ./odcshim ./walnut
5859
SRC_DIRS := ./apricot ./cmd/* ./core ./coconut ./executor ./common ./configuration ./occ/peanut ./odcshim ./walnut
5960

6061
# Use linker flags to provide version/build settings to the target
@@ -126,10 +127,15 @@ vendor:
126127
@echo -e "\033[1;33mgo mod vendor\033[0m"
127128
@go mod vendor
128129

130+
# FIXME: these two protofiles should be committed in order to be available offline
129131
@echo -e "\033[1;33mcurl odc.proto\033[0m"
130132
@mkdir -p odcshim/odcprotos
131133
@curl -s -L $(ODC_PROTO) -o odcshim/odcprotos/odc.proto
132134

135+
@echo -e "\033[1;33mcurl ddsched.proto\033[0m"
136+
@mkdir -p core/integration/ddsched/protos
137+
@curl -s -L $(DD_PROTO) -o core/integration/ddsched/protos/ddsched.proto
138+
133139
# WORKAROUND: In order to avoid the following issues:
134140
# https://github.com/golang/protobuf/issues/992
135141
# https://github.com/golang/protobuf/issues/1158

cmd/o2-aliecs-core/main.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,11 +29,24 @@ import (
2929

3030
"github.com/AliceO2Group/Control/common/logger/infologger"
3131
"github.com/AliceO2Group/Control/core"
32+
"github.com/AliceO2Group/Control/core/integration"
33+
"github.com/AliceO2Group/Control/core/integration/dcs"
34+
"github.com/AliceO2Group/Control/core/integration/ddsched"
3235
log "github.com/sirupsen/logrus"
3336
"github.com/teo/logrus-prefixed-formatter"
3437
)
3538

3639
func init() {
40+
// TODO: enable/disable switches for plugins
41+
integration.RegisterPlugin(
42+
"dcs",
43+
"dcsServiceEndpoint",
44+
dcs.NewPlugin)
45+
integration.RegisterPlugin(
46+
"ddsched",
47+
"ddSchedulerEndpoint",
48+
ddsched.NewPlugin)
49+
3750
log.SetFormatter(&prefixed.TextFormatter{
3851
FullTimestamp: true,
3952
SpacePadding: 20,

core/config.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,7 @@ func setFlags() error {
139139
pflag.Bool("dumpWorkflows", viper.GetBool("dumpWorkflows"), "Dump unprocessed and processed workflow files (`$PWD/wf-{,un}processed-<timestamp>.json`)")
140140
pflag.String("configServiceUri", viper.GetString("configServiceUri"), "URI of the Apricot instance (`apricot://host:port`), Consul server (`consul://`) or YAML configuration file, entry point for all configuration")
141141
pflag.String("dcsServiceEndpoint", viper.GetString("dcsServiceEndpoint"), "Endpoint of the DCS gRPC service (`host:port`)")
142-
pflag.String("integrationPlugins", viper.GetString("integrationPlugins"), "List of integration plugins to load (default: empty)")
142+
pflag.StringSlice("integrationPlugins", viper.GetStringSlice("integrationPlugins"), "List of integration plugins to load (default: empty)")
143143
pflag.String("coreConfigEntry", viper.GetString("coreConfigEntry"), "key for AliECS core configuration within the `aliecs` component [EXPERT SETTING]")
144144
pflag.String("fmqPlugin", viper.GetString("fmqPlugin"), "Name of the plugin for FairMQ tasks")
145145
pflag.String("fmqPluginSearchPath", viper.GetString("fmqPluginSearchPath"), "Path to the directory where the FairMQ plugins are found on controlled nodes")

core/integration/dcs/plugin.go

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,9 @@ import (
3535
"strconv"
3636
"time"
3737

38+
"github.com/AliceO2Group/Control/core/integration"
3839
dcspb "github.com/AliceO2Group/Control/core/integration/dcs/protos"
40+
"github.com/AliceO2Group/Control/core/workflow/callable"
3941
"github.com/spf13/viper"
4042
"google.golang.org/grpc"
4143
"google.golang.org/grpc/connectivity"
@@ -50,7 +52,7 @@ type Plugin struct {
5052
dcsClient *RpcClient
5153
}
5254

53-
func NewPlugin(endpoint string) *Plugin {
55+
func NewPlugin(endpoint string) integration.Plugin {
5456
u, err := url.Parse(endpoint)
5557
if err != nil {
5658
log.WithField("endpoint", endpoint).
@@ -105,7 +107,12 @@ func (p *Plugin) Init(instanceId string) error {
105107
return nil
106108
}
107109

108-
func (p *Plugin) ObjectStack(varStack map[string]string) (stack map[string]interface{}) {
110+
func (p *Plugin) ObjectStack(data interface{}) (stack map[string]interface{}) {
111+
call, ok := data.(*callable.Call)
112+
if !ok {
113+
return
114+
}
115+
varStack := call.VarStack
109116
stack = make(map[string]interface{})
110117
stack["StartOfRun"] = func() (out string) { // must formally return string even when we return nothing
111118
log.Debug("performing DCS SOR")

core/integration/dcs/protos/dcs.pb.go

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

core/integration/ddsched/client.go

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
/*
2+
* === This file is part of ALICE O² ===
3+
*
4+
* Copyright 2021 CERN and copyright holders of ALICE O².
5+
* Author: Teo Mrnjavac <teo.mrnjavac@cern.ch>
6+
*
7+
* This program is free software: you can redistribute it and/or modify
8+
* it under the terms of the GNU General Public License as published by
9+
* the Free Software Foundation, either version 3 of the License, or
10+
* (at your option) any later version.
11+
*
12+
* This program is distributed in the hope that it will be useful,
13+
* but WITHOUT ANY WARRANTY; without even the implied warranty of
14+
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15+
* GNU General Public License for more details.
16+
*
17+
* You should have received a copy of the GNU General Public License
18+
* along with this program. If not, see <http://www.gnu.org/licenses/>.
19+
*
20+
* In applying this license CERN does not waive the privileges and
21+
* immunities granted to it by virtue of its status as an
22+
* Intergovernmental Organization or submit itself to any jurisdiction.
23+
*/
24+
25+
package ddsched
26+
27+
import (
28+
"context"
29+
30+
"github.com/AliceO2Group/Control/common/logger"
31+
ddpb "github.com/AliceO2Group/Control/core/integration/ddsched/protos"
32+
"github.com/sirupsen/logrus"
33+
"google.golang.org/grpc"
34+
"google.golang.org/grpc/connectivity"
35+
)
36+
37+
var log = logger.New(logrus.StandardLogger(),"ddschedclient")
38+
39+
40+
type RpcClient struct {
41+
ddpb.DataDistributionControlClient
42+
conn *grpc.ClientConn
43+
}
44+
45+
func NewClient(cxt context.Context, cancel context.CancelFunc, endpoint string) *RpcClient {
46+
log.WithFields(logrus.Fields{
47+
"endpoint": endpoint,
48+
}).Debug("dialing DD scheduler client")
49+
conn, err := grpc.DialContext(cxt, endpoint, grpc.WithInsecure(), grpc.WithBlock())
50+
if err != nil {
51+
log.WithField("error", err.Error()).
52+
WithField("endpoint", endpoint).
53+
Errorf("cannot dial RPC endpoint")
54+
cancel()
55+
return nil
56+
}
57+
58+
client := &RpcClient {
59+
DataDistributionControlClient: ddpb.NewDataDistributionControlClient(conn),
60+
conn: conn,
61+
}
62+
63+
return client
64+
}
65+
66+
func (m *RpcClient) GetConnState() connectivity.State {
67+
if m.conn == nil {
68+
return connectivity.Idle
69+
}
70+
return m.conn.GetState()
71+
}
72+
73+
func (m *RpcClient) Close() error {
74+
return m.conn.Close()
75+
}

0 commit comments

Comments
 (0)