Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -429,6 +429,7 @@ Changes to the following annotations causes pools to be recreated and cause an e
- `k8s.cloudscale.ch/loadbalancer-pool-algorithm`
- `k8s.cloudscale.ch/loadbalancer-pool-protocol`
- `k8s.cloudscale.ch/loadbalancer-listener-allowed-subnets`
- `k8s.cloudscale.ch/loadbalancer-node-selector`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe this might not be the correct category. Firstly, the your implementation does not recreate the pools. Secondly, if executed correctly, this change should not result in any downtime (the approach would be to schedule pods on the new node, then add the new node and remove the old one).


Additionally, changes to `spec.externalTrafficPolicy` have the same effect.

Expand Down
57 changes: 57 additions & 0 deletions examples/nginx-hello-nodeselector.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
# Deploys the docker.io/nginxdemos/hello:plain-text container and creates a
# loadbalancer service with a node-selector annotation for it:
#
# export KUBECONFIG=path/to/kubeconfig
# kubectl apply -f nginx-hello.yml
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: nginx-hello-nodeselector.yml

#
# Wait for `kubectl describe service hello` to show "Loadbalancer Ensured",
# then use the IP address found under "LoadBalancer Ingress" to connect to the
# service.
#
# You can also use the following shortcut:
#
# curl http://$(kubectl get service hello -o jsonpath='{.status.loadBalancer.ingress[0].ip}')
#
# If you follow the nginx log, you will see that nginx sees a cluster internal
# IP address as source of requests:
#
# kubectl logs -l "app=hello"
#
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: hello
spec:
replicas: 2
selector:
matchLabels:
app: hello
template:
metadata:
labels:
app: hello
spec:
containers:
- name: hello
image: docker.io/nginxdemos/hello:plain-text
nodeSelector:
kubernetes.io/hostname: k8test-worker-2
---
apiVersion: v1
kind: Service
metadata:
labels:
app: hello
annotations:
k8s.cloudscale.ch/loadbalancer-node-selector: "kubernetes.io/hostname=k8test-worker-2"
name: hello
spec:
ports:
- port: 80
protocol: TCP
targetPort: 80
name: http
selector:
app: hello
type: LoadBalancer
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion: I am wondering if this example would be more interesting with externalTrafficPolicy: Local?

32 changes: 28 additions & 4 deletions pkg/cloudscale_ccm/cloud.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,12 @@ import (
"strings"
"time"

cloudscale "github.com/cloudscale-ch/cloudscale-go-sdk/v6"
"github.com/cloudscale-ch/cloudscale-go-sdk/v6"
"golang.org/x/oauth2"
"k8s.io/client-go/kubernetes"
corev1 "k8s.io/api/core/v1"
"k8s.io/client-go/kubernetes/scheme"
v1 "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/tools/record"
"k8s.io/klog/v2"

cloudprovider "k8s.io/cloud-provider"
Expand All @@ -32,6 +35,8 @@ const (
type cloud struct {
instances *instances
loadbalancer *loadbalancer

eventRecorder record.EventRecorder
}

// Register this provider with Kubernetes.
Expand Down Expand Up @@ -112,8 +117,27 @@ func (c *cloud) Initialize(

// This cannot be configured earlier, even though it seems better situated
// in newCloudscaleClient
c.loadbalancer.k8s = kubernetes.NewForConfigOrDie(
clientBuilder.ConfigOrDie("cloudscale-cloud-controller-manager"))
c.loadbalancer.k8s = clientBuilder.ClientOrDie(
"cloudscale-cloud-controller-manager",
)

eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartRecordingToSink(&v1.EventSinkImpl{
Interface: c.loadbalancer.k8s.CoreV1().Events(""),
})
c.eventRecorder = eventBroadcaster.NewRecorder(scheme.Scheme,
corev1.EventSource{
Component: "cloudscale-cloud-controller-manager",
},
)

go func() {
// wait until stop chan closes
<-stop
eventBroadcaster.Shutdown()
}()

c.loadbalancer.recorder = c.eventRecorder
}

// LoadBalancer returns a balancer interface. Also returns true if the
Expand Down
72 changes: 61 additions & 11 deletions pkg/cloudscale_ccm/loadbalancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package cloudscale_ccm

import (
"context"
"errors"
"fmt"
"slices"
"strings"
Expand All @@ -11,7 +10,9 @@ import (
"github.com/cloudscale-ch/cloudscale-go-sdk/v6"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/record"
"k8s.io/klog/v2"
"k8s.io/utils/ptr"
)
Expand Down Expand Up @@ -208,7 +209,7 @@ const (
// connections timing out while the monitor is updated.
LoadBalancerHealthMonitorTimeoutS = "k8s.cloudscale.ch/loadbalancer-health-monitor-timeout-s"

// LoadBalancerHealthMonitorDownThreshold is the number of the checks that
// LoadBalancerHealthMonitorUpThreshold is the number of the checks that
// need to succeed before a pool member is considered up. Defaults to 2.
LoadBalancerHealthMonitorUpThreshold = "k8s.cloudscale.ch/loadbalancer-health-monitor-up-threshold"

Expand Down Expand Up @@ -278,7 +279,7 @@ const (
// Changing this annotation on an established service is considered safe.
LoadBalancerListenerTimeoutMemberDataMS = "k8s.cloudscale.ch/loadbalancer-timeout-member-data-ms"

// LoadBalancerSubnetLimit is a JSON list of subnet UUIDs that the
// LoadBalancerListenerAllowedSubnets is a JSON list of subnet UUIDs that the
// loadbalancer should use. By default, all subnets of a node are used:
//
// * `[]` means that anyone is allowed to connect (default).
Expand All @@ -291,12 +292,17 @@ const (
// This is an advanced feature, useful if you have nodes that are in
// multiple private subnets.
LoadBalancerListenerAllowedSubnets = "k8s.cloudscale.ch/loadbalancer-listener-allowed-subnets"

// LoadBalancerNodeSelector can be set to restrict which nodes are added to the LB pool.
// It accepts a standard Kubernetes label selector string.
LoadBalancerNodeSelector = "k8s.cloudscale.ch/loadbalancer-node-selector"
)

type loadbalancer struct {
lbs lbMapper
srv serverMapper
k8s kubernetes.Interface
lbs lbMapper
srv serverMapper
k8s kubernetes.Interface
recorder record.EventRecorder
}

// GetLoadBalancer returns whether the specified load balancer exists, and
Expand Down Expand Up @@ -387,16 +393,23 @@ func (l *loadbalancer) EnsureLoadBalancer(
return nil, err
}

// Refuse to do anything if there are no nodes
nodes, err := filterNodesBySelector(serviceInfo, nodes)
if err != nil {
return nil, err
}

if len(nodes) == 0 {
return nil, errors.New(
"no valid nodes for service found, please verify there is " +
"at least one that allows load balancers",
l.recorder.Event(
service,
v1.EventTypeWarning,
"NoValidNodes",
"No valid nodes for service found, "+
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion: can we include the configured value from serviceInfo.Service.Annotations here?

"double-check node-selector annotation",
)
}

// Reconcile
err := reconcileLbState(ctx, l.lbs.client, func() (*lbState, error) {
err = reconcileLbState(ctx, l.lbs.client, func() (*lbState, error) {
// Get the desired state from Kubernetes
servers, err := l.srv.mapNodes(ctx, nodes).All()
if err != nil {
Expand Down Expand Up @@ -442,6 +455,28 @@ func (l *loadbalancer) EnsureLoadBalancer(
return result, nil
}

func filterNodesBySelector(
serviceInfo *serviceInfo,
nodes []*v1.Node,
) ([]*v1.Node, error) {
selector := labels.Everything()
if v := serviceInfo.annotation(LoadBalancerNodeSelector); v != "" {
var err error
selector, err = labels.Parse(v)
if err != nil {
return nil, fmt.Errorf("unable to parse selector: %w", err)
}
}
selectedNodes := make([]*v1.Node, 0, len(nodes))
for _, node := range nodes {
if selector.Matches(labels.Set(node.Labels)) {
selectedNodes = append(selectedNodes, node)
}
}

return selectedNodes, nil
}

// UpdateLoadBalancer updates hosts under the specified load balancer.
// Implementations must treat the *v1.Service and *v1.Node
// parameters as read-only and not modify them.
Expand All @@ -461,6 +496,21 @@ func (l *loadbalancer) UpdateLoadBalancer(
return err
}

nodes, err := filterNodesBySelector(serviceInfo, nodes)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: introduce new variable such as filteredNodes instead of reassigning nodes param, if you agree.

if err != nil {
return err
}

if len(nodes) == 0 {
l.recorder.Event(
service,
v1.EventTypeWarning,
"NoValidNodes",
"No valid nodes for service found, "+
"double-check node-selector annotation",
)
}

// Reconcile
return reconcileLbState(ctx, l.lbs.client, func() (*lbState, error) {
// Get the desired state from Kubernetes
Expand Down
20 changes: 4 additions & 16 deletions pkg/cloudscale_ccm/reconcile.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,13 @@ type lbState struct {

// Pool pointers are used to refer to members by pool, therefore use a
// pointer here as well, to not accidentally copy the struct.
pools []*cloudscale.LoadBalancerPool
members map[*cloudscale.LoadBalancerPool][]cloudscale.
LoadBalancerPoolMember
monitors map[*cloudscale.LoadBalancerPool][]cloudscale.
LoadBalancerHealthMonitor
pools []*cloudscale.LoadBalancerPool
members map[*cloudscale.LoadBalancerPool][]cloudscale.LoadBalancerPoolMember
monitors map[*cloudscale.LoadBalancerPool][]cloudscale.LoadBalancerHealthMonitor

// Though not currently used that way, listeners are not
// necessarily bound to any given pool.
listeners map[*cloudscale.LoadBalancerPool][]cloudscale.
LoadBalancerListener
listeners map[*cloudscale.LoadBalancerPool][]cloudscale.LoadBalancerListener

// The assigned floating IPs
floatingIPs []string
Expand Down Expand Up @@ -201,15 +198,6 @@ func desiredLbState(
}
}

// If there are no pool members, return an error. It would be possible
// to just put a load balancer up that has no function, but it seems
// more useful to err instead, as there's likely something wrong.
if len(s.members[&pool]) == 0 {
return nil, fmt.Errorf(
"service %s: no private address found on any node",
serviceInfo.Service.Name)
}

// Add a health monitor for each pool
monitor, err := healthMonitorForPort(serviceInfo)
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions pkg/cloudscale_ccm/reconcile_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1156,10 +1156,10 @@ func TestLimitSubnets(t *testing.T) {
assert.Equal(t, "10.0.1.1", state.members[state.pools[0]][0].Address)
assert.Equal(t, "10.0.1.2", state.members[state.pools[0]][1].Address)

// If we have no valid addresses, we get an error
// If we have no valid addresses, we get no error
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since a typo in a label selector now causes immediate downtime (by wiping the pool) instead of a "fail-safe" error, could we ensure this behavioral change is clearly highlighted in the documentation and release notes?

s.Annotations[LoadBalancerListenerAllowedSubnets] = `
["00000000-0000-0000-0000-000000000003"]`

_, err = desiredLbState(i, nodes, servers)
assert.Error(t, err)
assert.NoError(t, err)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMHO unit tests for the new annotations should be added to this file.

}
2 changes: 2 additions & 0 deletions pkg/cloudscale_ccm/service_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,8 @@ func (s serviceInfo) annotation(key string) string {
return s.annotationOrDefault(key, "50000")
case LoadBalancerListenerAllowedSubnets:
return s.annotationOrDefault(key, "[]")
case LoadBalancerNodeSelector:
return s.annotationOrDefault(key, "")
default:
return s.annotationOrElse(key, func() string {
klog.Warning("unknown annotation:", key)
Expand Down
Loading
Loading