Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Dockerfile.dev
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ FROM ${GO_IMAGE}

WORKDIR /go/src/github.com/digitalocean/flipop

ENV GOPATH /go
ENV GOPATH=/go
COPY . /go/src/github.com/digitalocean/flipop

RUN go mod download
Expand Down
77 changes: 70 additions & 7 deletions pkg/floatingip/floatingippool_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package floatingip

import (
"context"
"errors"
"fmt"
"reflect"
"sync"
Expand All @@ -29,6 +30,7 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/sirupsen/logrus"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
kubetypes "k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes"
Expand All @@ -41,6 +43,9 @@ import (
)

const (
// NodeAddressTypeReservedIP is a value for use in the node.Status.Addresses[].Type field.
NodeAddressTypeReservedIP = "ReservedIP"

floatingIPPoolResyncPeriod = 5 * time.Minute
)

Expand Down Expand Up @@ -269,14 +274,72 @@ func (c *Controller) statusUpdater(log logrus.FieldLogger, name, namespace strin
log.WithError(err).Error("loading FloatingIPPool status")
return fmt.Errorf("loading FloatingIPPool: %w", err)
}
if reflect.DeepEqual(status, k8s.Status) {
return nil
if !reflect.DeepEqual(status, k8s.Status) {
k8s.Status = status
_, err = c.flipopCS.FlipopV1alpha1().FloatingIPPools(k8s.Namespace).UpdateStatus(ctx, k8s, metav1.UpdateOptions{})
if err != nil {
log.WithError(err).Error("updating FloatingIPPool status")
return fmt.Errorf("updating FloatingIPPool status: %w", err)
}
}
k8s.Status = status
_, err = c.flipopCS.FlipopV1alpha1().FloatingIPPools(k8s.Namespace).UpdateStatus(ctx, k8s, metav1.UpdateOptions{})
if err != nil {
log.WithError(err).Error("updating FloatingIPPool status")
return fmt.Errorf("updating FloatingIPPool status: %w", err)

c.poolLock.Lock()
pool, ok := c.pools[k8s.GetUID()]
if !ok {
c.poolLock.Unlock()
log.WithError(err).Error("failed to find pool")
return errors.New("failed to find pool")
}
nodes := pool.matchController.GetAllNodes()
c.poolLock.Unlock()

nodeToIPs := make(map[string]string)
for ip, ipStatus := range status.IPs {
if ipStatus.NodeName == "" {
continue
}
nodeToIPs[ipStatus.NodeName] = ip
}

for _, n := range nodes {
var updatedAddrs []corev1.NodeAddress
var updateNeeded bool
for _, addr := range n.Status.Addresses {
if addr.Type != NodeAddressTypeReservedIP {
updatedAddrs = append(updatedAddrs, addr)
continue
}
// IPs can be registered to other floating IP pools. If we don't know the IP, skip.
if _, ok := status.IPs[addr.Address]; !ok {
updatedAddrs = append(updatedAddrs, addr)
continue
}
if addr.Address == nodeToIPs[n.Name] {
// The node is properly configured.
continue
}
// This address is stale, flag the node for update and don't include in updatedAddrs.
updateNeeded = true
}
ip, ok := nodeToIPs[n.Name]
if ok {
updatedAddrs = append(updatedAddrs, corev1.NodeAddress{
Type: NodeAddressTypeReservedIP,
Address: ip,
})
updateNeeded = true
}
if !updateNeeded {
log.Debugf("node %q status unchanged", n.Name)
continue
}
n.Status.Addresses = updatedAddrs
log.Infof("updating node %q address status", n.Name)
_, err := c.kubeCS.CoreV1().Nodes().UpdateStatus(ctx, n, metav1.UpdateOptions{})
if err != nil {
log.WithError(err).WithField("node", n.Name).Error("updating node status")
return fmt.Errorf("updating node status: %w", err)
}
}
return nil
}
Expand Down
10 changes: 10 additions & 0 deletions pkg/nodematch/match_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -479,6 +479,16 @@ func (m *Controller) OnDelete(obj interface{}) {
}
}

// GetNodeByName retrieves a Kubernetes node resource from the informer.
func (m *Controller) GetAllNodes() []*corev1.Node {
nodes := m.nodeInformer.GetIndexer().List()
var out []*corev1.Node
for _, n := range nodes {
out = append(out, n.(*corev1.Node).DeepCopy())
}
return out
}

type node struct {
k8sNode *corev1.Node
isNodeMatch bool
Expand Down