Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .github/workflows/integration-tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ jobs:
-f manifests/serviceaccount.yaml \
-f manifests/netpol.yaml

- name: Label namespace
run: |
kubectl label namespace anray-liu app.kubernetes.io/part-of=kubeflow-profile

- name: Apply configs
run: |
kubectl apply -f manifests/controller/controller_config.yaml \
Expand Down Expand Up @@ -89,6 +93,9 @@ jobs:
run: |
kubectl apply -f testing/sts.yaml
sleep 5
kubectl logs -l app=volume-cleaner-controller -n das --tail 500
kubectl get pvc pvc1 -n anray-liu -o yaml
kubectl get ns anray-liu -o yaml

- name: Test label removed
run: |
Expand Down
1 change: 1 addition & 0 deletions cmd/controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ func main() {

cfg := structInternal.ControllerConfig{
Namespace: os.Getenv("NAMESPACE"),
NsLabel: os.Getenv("NS_LABEL"),
TimeLabel: os.Getenv("TIME_LABEL"),
NotifLabel: os.Getenv("NOTIF_LABEL"),
TimeFormat: os.Getenv("TIME_FORMAT"),
Expand Down
1 change: 1 addition & 0 deletions cmd/scheduler/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ func main() {
// there is also a config for the controller
cfg := structInternal.SchedulerConfig{
Namespace: os.Getenv("NAMESPACE"),
NsLabel: os.Getenv("NS_LABEL"),
TimeLabel: os.Getenv("TIME_LABEL"),
NotifLabel: os.Getenv("NOTIF_LABEL"),
TimeFormat: os.Getenv("TIME_FORMAT"),
Expand Down
142 changes: 74 additions & 68 deletions internal/kubernetes/finder.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,101 +34,107 @@ func FindStale(kube kubernetes.Interface, cfg structInternal.SchedulerConfig) (i
log.Print("[INFO] Scanning for stale PVCS...")

// iterate through all pvcs in configured namespace(s)

for _, pvc := range PvcList(kube, cfg.Namespace) {
log.Printf("[INFO] Found PVC %s from NS %s", pvc.Name, pvc.Namespace)

// check if label exists (meaning pvc is unattached)
// if pvc is attached to a sts, it would've had its label removed by the controller

timestamp, ok := pvc.Labels[cfg.TimeLabel]
if !ok {
log.Printf("[INFO] Label %s not found. Skipping.", cfg.TimeLabel)
for _, ns := range NsList(kube, cfg.NsLabel) {
// skip if not in configured namespace
if ns.Name != cfg.Namespace && cfg.Namespace != "" {
continue
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This git diff looks weird but all I've done is wrap the red code inside the block above

}

// check if pvc should be deleted
stale, staleError := IsStale(timestamp, cfg.TimeFormat, cfg.GracePeriod)
if staleError != nil {
log.Printf("[ERROR] Failed to parse timestamp: %s", staleError)
errCount++
continue
}
for _, pvc := range PvcList(kube, ns.Name) {
log.Printf("[INFO] Found PVC %s from NS %s", pvc.Name, pvc.Namespace)

// stale means grace period has passed, can be deleted
if stale {
if cfg.DryRun {
log.Printf("[DRY RUN] Delete PVC %s", pvc.Name)
deleteCount++
// check if label exists (meaning pvc is unattached)
// if pvc is attached to a sts, it would've had its label removed by the controller

timestamp, ok := pvc.Labels[cfg.TimeLabel]
if !ok {
log.Printf("[INFO] Label %s not found. Skipping.", cfg.TimeLabel)
continue
}

err := kube.CoreV1().PersistentVolumeClaims(pvc.Namespace).Delete(context.TODO(), pvc.Name, metav1.DeleteOptions{})
if err != nil {
log.Printf("[ERROR] Failed to delete PVC %s: %s", pvc.Name, err)
// check if pvc should be deleted
stale, staleError := IsStale(timestamp, cfg.TimeFormat, cfg.GracePeriod)
if staleError != nil {
log.Printf("[ERROR] Failed to parse timestamp: %s", staleError)
errCount++
continue
}

log.Print("[INFO] PVC successfully deleted.")
deleteCount++

} else {
// not stale yet, handle email logic here

log.Print("[INFO] Grace period not passed.")
// stale means grace period has passed, can be deleted
if stale {
if cfg.DryRun {
log.Printf("[DRY RUN] Delete PVC %s", pvc.Name)
deleteCount++
continue
}

notifCount, ok := pvc.Labels[cfg.NotifLabel]
if !ok {
log.Printf("[INFO] Label %s not found. Skipping.", cfg.NotifLabel)
errCount++
continue
}
err := kube.CoreV1().PersistentVolumeClaims(pvc.Namespace).Delete(context.TODO(), pvc.Name, metav1.DeleteOptions{})
if err != nil {
log.Printf("[ERROR] Failed to delete PVC %s: %s", pvc.Name, err)
errCount++
continue
}

currNotif, countErr := strconv.Atoi(notifCount)
if countErr != nil {
log.Printf("[ERROR] Failed to parse notification count: %v", countErr)
errCount++
continue
}
log.Print("[INFO] PVC successfully deleted.")
deleteCount++

if len(cfg.NotifTimes) == 0 {
continue
}
} else {
// not stale yet, handle email logic here

shouldSend, mailError := ShouldSendMail(timestamp, currNotif, cfg)
if mailError != nil {
log.Printf("[ERROR] Failed to parse timestamp: %s", mailError)
errCount++
continue
}
log.Print("[INFO] Grace period not passed.")

if shouldSend {
if cfg.DryRun {
log.Print("[DRY RUN] Email owner.")
emailCount++
notifCount, ok := pvc.Labels[cfg.NotifLabel]
if !ok {
log.Printf("[INFO] Label %s not found. Skipping.", cfg.NotifLabel)
errCount++
continue
}

// personal consists of details passed into the email template as variables while email is
// the email address that is consistent regardless of the template
currNotif, countErr := strconv.Atoi(notifCount)
if countErr != nil {
log.Printf("[ERROR] Failed to parse notification count: %v", countErr)
errCount++
continue
}

email, personal := utilsInternal.EmailDetails(kube, pvc, cfg.GracePeriod)
if len(cfg.NotifTimes) == 0 {
continue
}

err := utilsInternal.SendNotif(client, cfg.EmailCfg, email, personal)
if err != nil {
log.Printf("[Error] Unable to send an email to %s at %s", personal.Name, email)
shouldSend, mailError := ShouldSendMail(timestamp, currNotif, cfg)
if mailError != nil {
log.Printf("[ERROR] Failed to parse timestamp: %s", mailError)
errCount++
continue
}

// Update Email Count
emailCount++
if shouldSend {
if cfg.DryRun {
log.Print("[DRY RUN] Email owner.")
emailCount++
continue
}

// personal consists of details passed into the email template as variables while email is
// the email address that is consistent regardless of the template

// Increment notification count by 1
newNotifCount := strconv.Itoa(currNotif + 1)
SetPvcLabel(kube, cfg.NotifLabel, newNotifCount, pvc.Namespace, pvc.Name)
email, personal := utilsInternal.EmailDetails(kube, pvc, cfg.GracePeriod)

err := utilsInternal.SendNotif(client, cfg.EmailCfg, email, personal)
if err != nil {
log.Printf("[Error] Unable to send an email to %s at %s", personal.Name, email)
errCount++
continue
}

// Update Email Count
emailCount++

// Increment notification count by 1
newNotifCount := strconv.Itoa(currNotif + 1)
SetPvcLabel(kube, cfg.NotifLabel, newNotifCount, pvc.Namespace, pvc.Name)

}
}
}
}
Expand Down
6 changes: 3 additions & 3 deletions internal/kubernetes/retriever.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@ import (

// returns a slice of corev1.Namespace structs

func NsList(kube kubernetes.Interface) []corev1.Namespace {
func NsList(kube kubernetes.Interface, label string) []corev1.Namespace {
ns, err := kube.CoreV1().Namespaces().List(context.TODO(), metav1.ListOptions{
LabelSelector: "app.kubernetes.io/part-of=kubeflow-profile",
LabelSelector: label,
})
if err != nil {
// nothing can be done without namespaces so crash the program
Expand Down Expand Up @@ -74,7 +74,7 @@ func FindUnattachedPVCs(kube kubernetes.Interface, cfg structInternal.Controller

log.Print("[INFO] Scanning namespaces...")

for _, namespace := range NsList(kube) {
for _, namespace := range NsList(kube, cfg.NsLabel) {
// skip if not in configured namespace
if namespace.Name != cfg.Namespace && cfg.Namespace != "" {
continue
Expand Down
20 changes: 19 additions & 1 deletion internal/kubernetes/retriever_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@ func TestNsList(t *testing.T) {
}
}

list := NsList(kube)
// check with correct label

list := NsList(kube, "app.kubernetes.io/part-of=kubeflow-profile")

// check right length
assert.Equal(t, len(list), len(names))
Expand All @@ -41,6 +43,22 @@ func TestNsList(t *testing.T) {
assert.Equal(t, ns.Name, names[i])
}

// check with incorrect label

list = NsList(kube, "bad-label")

assert.Equal(t, len(list), 0)

// check with empty label

list = NsList(kube, "")

assert.Equal(t, len(list), len(names))

for i, ns := range list {
assert.Equal(t, ns.Name, names[i])
}

})
}

Expand Down
83 changes: 53 additions & 30 deletions internal/kubernetes/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
// standard packages
"context"
"log"
"sync"
"time"

// external packages
Expand All @@ -19,45 +20,62 @@ import (
// Watches for when statefulsets are created or deleted

func WatchSts(ctx context.Context, kube kubernetes.Interface, cfg structInternal.ControllerConfig) {
watcher, err := kube.AppsV1().StatefulSets(cfg.Namespace).Watch(ctx, metav1.ListOptions{})
if err != nil {
log.Fatalf("[ERROR] Failed to create watcher for statefulsets: %s", err)
}
var wg sync.WaitGroup

// iterate through all pvcs in configured namespace(s)
for _, ns := range NsList(kube, cfg.NsLabel) {
// skip if not in configured namespace
if ns.Name != cfg.Namespace && cfg.Namespace != "" {
continue
}

log.Print("[INFO] Watching for statefulset events...")
wg.Add(1)
go func() {
defer log.Printf("[INFO] Watcher for NS %s finished.", ns.Name)
defer wg.Done()

// create a channel to capture sts events in the cluster
events := watcher.ResultChan()
watcher, err := kube.AppsV1().StatefulSets(ns.Name).Watch(ctx, metav1.ListOptions{})
if err != nil {
log.Fatalf("[ERROR] Failed to create watcher for statefulsets: %s", err)
}

for {
select {
log.Printf("[INFO] Watching NS %s for statefulset events...", ns.Name)

// context used to kill loop
// used during unit tests
case <-ctx.Done():
return
// create a channel to capture sts events in the cluster
events := watcher.ResultChan()

// sts was added or deleted
case event := <-events:
sts, ok := event.Object.(*appsv1.StatefulSet)
for {
select {

// Skip this event if it can't be parsed into a sts
if !ok {
continue
}
// context used to kill loop
// used during unit tests
case <-ctx.Done():
return

// sts was added or deleted
case event := <-events:
sts, ok := event.Object.(*appsv1.StatefulSet)

switch event.Type {
// Skip this event if it can't be parsed into a sts
if !ok {
continue
}

case watch.Added:
// sts added
handleAdded(kube, cfg, sts)
case watch.Deleted:
// sts deleted
handleDeleted(kube, cfg, sts)
switch event.Type {

case watch.Added:
// sts added
handleAdded(kube, cfg, sts)
case watch.Deleted:
// sts deleted
handleDeleted(kube, cfg, sts)
}
}
}
}
}
}()

wg.Wait()
}
}

// scan performed on controller startup to find unattached pvcs and assign labels to them
Expand Down Expand Up @@ -89,7 +107,12 @@ func InitialScan(kube kubernetes.Interface, cfg structInternal.ControllerConfig)
func ResetLabels(kube kubernetes.Interface, cfg structInternal.ControllerConfig) {
log.Print("Resetting labels...")

for _, namespace := range NsList(kube) {
for _, namespace := range NsList(kube, cfg.NsLabel) {
// skip if not in configured namespace
if namespace.Name != cfg.Namespace && cfg.Namespace != "" {
continue
}

for _, pvc := range PvcList(kube, namespace.Name) {
_, ok := pvc.Labels[cfg.TimeLabel]
if ok {
Expand Down
2 changes: 2 additions & 0 deletions internal/structure/configs.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ API_KEY: "Random APIKEY",

type ControllerConfig struct {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason why we don't extract the common fields into their own shared config? Or why not have 1 config struct?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The controller and scheduler were designed to be as loosely coupled as possible, so we didn't want to mix the configs. It would also be too unnecessarily complicated to create another struct for just a few values.

Namespace string
NsLabel string
TimeLabel string
NotifLabel string
TimeFormat string
Expand All @@ -38,6 +39,7 @@ type ControllerConfig struct {

type SchedulerConfig struct {
Namespace string
NsLabel string
TimeLabel string
NotifLabel string
TimeFormat string
Expand Down
Loading
Loading