Skip to content

Commit f6f9116

Browse files
committed
hack: wait for worker nodes and register targets with ingress LB
FIXME: CCM and in-cluster MAPI/CAPI needs to handle this
1 parent 6666a29 commit f6f9116

1 file changed

Lines changed: 212 additions & 0 deletions

File tree

cmd/openshift-install/command/waitfor.go

Lines changed: 212 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,17 @@ import (
44
"context"
55
"crypto/x509"
66
"fmt"
7+
"net/url"
78
"os"
89
"path/filepath"
910
"strings"
1011
"time"
1112

13+
configv2 "github.com/aws/aws-sdk-go-v2/config"
14+
"github.com/aws/aws-sdk-go-v2/service/ec2"
15+
ec2types "github.com/aws/aws-sdk-go-v2/service/ec2/types"
16+
elbv2 "github.com/aws/aws-sdk-go-v2/service/elasticloadbalancingv2"
17+
elbv2types "github.com/aws/aws-sdk-go-v2/service/elasticloadbalancingv2/types"
1218
"github.com/pkg/errors"
1319
"github.com/sirupsen/logrus"
1420
apierrors "k8s.io/apimachinery/pkg/api/errors"
@@ -18,11 +24,13 @@ import (
1824
"k8s.io/apimachinery/pkg/util/sets"
1925
"k8s.io/apimachinery/pkg/util/wait"
2026
"k8s.io/apimachinery/pkg/watch"
27+
"k8s.io/client-go/informers"
2128
"k8s.io/client-go/kubernetes"
2229
"k8s.io/client-go/rest"
2330
"k8s.io/client-go/tools/cache"
2431
"k8s.io/client-go/tools/clientcmd"
2532
clientwatch "k8s.io/client-go/tools/watch"
33+
"k8s.io/utils/ptr"
2634

2735
configv1 "github.com/openshift/api/config/v1"
2836
configclient "github.com/openshift/client-go/config/clientset/versioned"
@@ -33,6 +41,7 @@ import (
3341
"github.com/openshift/installer/pkg/asset/agent/agentconfig"
3442
"github.com/openshift/installer/pkg/asset/installconfig"
3543
timer "github.com/openshift/installer/pkg/metrics/timer"
44+
"github.com/openshift/installer/pkg/types/aws"
3645
"github.com/openshift/installer/pkg/types/baremetal"
3746
cov1helpers "github.com/openshift/library-go/pkg/config/clusteroperator/v1helpers"
3847
"github.com/openshift/library-go/pkg/route/routeapihelpers"
@@ -63,6 +72,12 @@ var SkipPasswordPrintFlag bool
6372
// WaitForInstallComplete waits for cluster to complete installation, checks for operator stability
6473
// and logs cluster information when successful.
6574
func WaitForInstallComplete(ctx context.Context, config *rest.Config, assetstore asset.Store) error {
75+
// FIXME: Register the worker nodes to target group of ingress LB.
76+
// Remove after CCM support dualstack NLB.
77+
if err := waitForWorkerNodesAvailability(ctx, config, assetstore); err != nil {
78+
return err
79+
}
80+
6681
if err := waitForInitializedCluster(ctx, config, assetstore); err != nil {
6782
return err
6883
}
@@ -83,6 +98,203 @@ func WaitForInstallComplete(ctx context.Context, config *rest.Config, assetstore
8398
return logComplete(RootOpts.Dir, consoleURL)
8499
}
85100

101+
// waitForWorkerNodesAvailability waits for worker nodes to be running and register them with the TargetGroup of the ingress NLB.
102+
// NOTE: This should be handled by the CCM, not the installer.
103+
func waitForWorkerNodesAvailability(ctx context.Context, config *rest.Config, assetstore asset.Store) error {
104+
timer.StartTimer("CCM: Worker nodes Available")
105+
icAsset, err := assetstore.Load(&installconfig.InstallConfig{})
106+
if err != nil {
107+
return fmt.Errorf("failed to load installconfig: %w", err)
108+
}
109+
if icAsset == nil {
110+
return fmt.Errorf("failed to installconfig: received nil")
111+
}
112+
113+
ic := icAsset.(*installconfig.InstallConfig).Config
114+
115+
// Nothing to do!
116+
if ic.Platform.AWS == nil {
117+
return nil
118+
}
119+
120+
ipFamily := ic.Platform.AWS.IPFamily
121+
// Nothing to do!
122+
if ipFamily != aws.DualStackIPv4PrimaryIPFamily && ipFamily != aws.DualStackIPv6PrimaryIPFamily {
123+
return nil
124+
}
125+
126+
region := ic.Platform.AWS.Region
127+
128+
// FIXME: Ignore edge compute pool
129+
numOfNodes := ptr.Deref(ic.Compute[0].Replicas, 0) + ptr.Deref(ic.ControlPlane.Replicas, 0)
130+
if numOfNodes == 0 {
131+
// nothing to do, but should not happen
132+
return nil
133+
}
134+
135+
nodeCheckDuration := 10 * time.Minute
136+
nodeContext, cancel := context.WithTimeout(ctx, nodeCheckDuration)
137+
defer cancel()
138+
139+
untilTime := time.Now().Add(nodeCheckDuration)
140+
timezone, _ := untilTime.Zone()
141+
logrus.Infof("CCM: Waiting up to %v (until %v %s) to ensure worker nodes are available and registered with ingress LB...",
142+
nodeCheckDuration, untilTime.Format(time.Kitchen), timezone)
143+
144+
cc, err := kubernetes.NewForConfig(config)
145+
if err != nil {
146+
return fmt.Errorf("failed to create a config client: %w", err)
147+
}
148+
configInformers := informers.NewSharedInformerFactory(cc, 0)
149+
nodeInformer := configInformers.Core().V1().Nodes().Informer()
150+
nodeLister := configInformers.Core().V1().Nodes().Lister()
151+
configInformers.Start(ctx.Done())
152+
if !cache.WaitForCacheSync(ctx.Done(), nodeInformer.HasSynced) {
153+
return fmt.Errorf("informers never started")
154+
}
155+
156+
// Create clients to call AWS API
157+
// FIXME: Let's ignore the custom endpoints for now
158+
cfg, err := configv2.LoadDefaultConfig(ctx, configv2.WithRegion(region))
159+
if err != nil {
160+
return fmt.Errorf("failed to load AWS config: %w", err)
161+
}
162+
elbv2Client := elbv2.NewFromConfig(cfg)
163+
ec2Client := ec2.NewFromConfig(cfg)
164+
165+
waitErr := wait.PollUntilContextCancel(nodeContext, 1*time.Second, true, func(ctx context.Context) (done bool, err error) {
166+
// If the expected number of nodes are running, proceed.
167+
// Otherwise, requeue.
168+
nodes, err := nodeLister.List(labels.Everything())
169+
if err != nil {
170+
return false, fmt.Errorf("failed to get nodes: %w", err)
171+
}
172+
if len(nodes) < int(numOfNodes) {
173+
return false, nil
174+
}
175+
176+
// Convert nodes to EC2 instance IDs
177+
var instanceIDs []string
178+
for _, node := range nodes {
179+
url, err := url.Parse(node.Spec.ProviderID)
180+
if err != nil {
181+
return false, fmt.Errorf("invalid node provider ID (%s): %w", node.Spec.ProviderID, err)
182+
}
183+
if url.Scheme != "aws" {
184+
return false, fmt.Errorf("invalid scheme for AWS instance (%s)", node.Spec.ProviderID)
185+
}
186+
187+
awsID := ""
188+
tokens := strings.Split(strings.Trim(url.Path, "/"), "/")
189+
// last token in the providerID is the aws resource ID for both EC2 and Fargate nodes
190+
if len(tokens) > 0 {
191+
awsID = tokens[len(tokens)-1]
192+
}
193+
instanceIDs = append(instanceIDs, awsID)
194+
}
195+
196+
// Enable Primary IPv6 flag if dual-stack IPv6 primary
197+
// FIXME: This should be done by MAPI when creating the instances.
198+
if ic.AWS.IPFamily == aws.DualStackIPv6PrimaryIPFamily {
199+
instances, err := ec2Client.DescribeInstances(ctx, &ec2.DescribeInstancesInput{
200+
InstanceIds: instanceIDs,
201+
})
202+
if err != nil {
203+
return false, fmt.Errorf("failed to describe ec2 instances for nodes: %w", err)
204+
}
205+
206+
// FIXME: Assume all instances have at least 1 ENI with 1 as primary ENI
207+
for _, reservation := range instances.Reservations {
208+
for _, instance := range reservation.Instances {
209+
var primaryENI ec2types.InstanceNetworkInterface
210+
for _, eni := range instance.NetworkInterfaces {
211+
if ptr.Deref(eni.Attachment.DeviceIndex, 0) == 0 {
212+
primaryENI = eni
213+
break
214+
}
215+
}
216+
217+
enabled := true
218+
_, err := ec2Client.ModifyNetworkInterfaceAttribute(ctx, &ec2.ModifyNetworkInterfaceAttributeInput{
219+
EnablePrimaryIpv6: &enabled,
220+
NetworkInterfaceId: primaryENI.NetworkInterfaceId,
221+
})
222+
if err != nil {
223+
return false, fmt.Errorf("failed to set primary ipv6 for instance %s: %w", *instance.InstanceId, err)
224+
}
225+
}
226+
}
227+
}
228+
229+
// Get the NodePort service for default ingress
230+
// Reference: oc -n openshift-ingress get svc router-nodeport-default -o=wide
231+
svc, err := cc.CoreV1().Services("openshift-ingress").Get(nodeContext, "router-nodeport-default", metav1.GetOptions{})
232+
if err != nil {
233+
// The service is not yet created by CIO, wait a bit more
234+
if apierrors.IsNotFound(err) {
235+
return false, nil
236+
}
237+
return false, fmt.Errorf("failed to get service openshift-ingress/router-nodeport-default: %w", err)
238+
}
239+
240+
// Check if nodes are already registered. If true, nothing to do.
241+
targetGrp, err := elbv2Client.DescribeTargetGroups(nodeContext, &elbv2.DescribeTargetGroupsInput{
242+
Names: []string{fmt.Sprintf("%.20s-ingress-%d", ic.GetName(), 443), fmt.Sprintf("%.20s-ingress-%d", ic.GetName(), 80)},
243+
})
244+
if err != nil {
245+
return false, fmt.Errorf("failed to describe target group: %w", err)
246+
}
247+
248+
for _, tg := range targetGrp.TargetGroups {
249+
// Get registered targets by querying target health API.
250+
targetDesc, err := elbv2Client.DescribeTargetHealth(nodeContext, &elbv2.DescribeTargetHealthInput{
251+
TargetGroupArn: tg.TargetGroupArn,
252+
})
253+
if err != nil {
254+
return false, fmt.Errorf("failed to get target health for target group %s: %w", *tg.TargetGroupArn, err)
255+
}
256+
257+
// No targets found. We will register nodes as targets.
258+
if len(targetDesc.TargetHealthDescriptions) == 0 {
259+
var nodeport int32
260+
for _, port := range svc.Spec.Ports {
261+
if port.Port == *tg.Port {
262+
nodeport = port.NodePort
263+
break
264+
}
265+
}
266+
267+
var targets []elbv2types.TargetDescription
268+
for _, instanceID := range instanceIDs {
269+
targets = append(targets, elbv2types.TargetDescription{
270+
Port: &nodeport,
271+
Id: &instanceID,
272+
})
273+
}
274+
275+
// Register worker and controlplane nodes with TargetGroup of ingress NLB.
276+
_, err = elbv2Client.RegisterTargets(nodeContext, &elbv2.RegisterTargetsInput{
277+
TargetGroupArn: tg.TargetGroupArn,
278+
Targets: targets,
279+
})
280+
if err != nil {
281+
return false, fmt.Errorf("failed to register nodes to ingress LB: %w", err)
282+
}
283+
}
284+
}
285+
286+
return true, nil
287+
})
288+
if waitErr != nil {
289+
return fmt.Errorf("failed to wait for worker node availability: %w", waitErr)
290+
}
291+
292+
timer.StopTimer("CCM: Worker nodes Available")
293+
294+
logrus.Info("CCM: Worker nodes are available and registered with ingress LB")
295+
return nil
296+
}
297+
86298
// waitForInitializedCluster watches the ClusterVersion waiting for confirmation
87299
// that the cluster has been initialized.
88300
func waitForInitializedCluster(ctx context.Context, config *rest.Config, assetstore asset.Store) error {

0 commit comments

Comments
 (0)