Skip to content

Commit 38cdb32

Browse files
Improved pod lifecycle management. (#27)
* Extact the pod if it's being deleted to avoid a future race condition during processing when the event is pulled of the work queue. * When processing events, look back to the owning deployment to understand if the pod's replica count is being changed to avoid excessive requests * added best effort cache to remove redundant requests to the api * use a slightly better separator * clarified delete events with update to the deployment spec * added comments
1 parent ea95436 commit 38cdb32

2 files changed

Lines changed: 150 additions & 34 deletions

File tree

deploy/manifest.yaml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,9 @@ rules:
1717
- apiGroups: [""]
1818
resources: ["pods"]
1919
verbs: ["get", "list", "watch"]
20+
- apiGroups: ["apps"]
21+
resources: ["deployments"]
22+
verbs: ["get"]
2023
---
2124
apiVersion: rbac.authorization.k8s.io/v1
2225
kind: ClusterRoleBinding

internal/controller/controller.go

Lines changed: 147 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,16 @@ import (
66
"fmt"
77
"log/slog"
88
"strings"
9+
"sync"
910
"time"
1011

1112
"github.com/github/deployment-tracker/pkg/deploymentrecord"
1213
"github.com/github/deployment-tracker/pkg/image"
1314
"github.com/github/deployment-tracker/pkg/metrics"
1415

1516
corev1 "k8s.io/api/core/v1"
17+
k8serrors "k8s.io/apimachinery/pkg/api/errors"
18+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
1619
"k8s.io/apimachinery/pkg/util/runtime"
1720
"k8s.io/apimachinery/pkg/util/wait"
1821
"k8s.io/client-go/informers"
@@ -21,10 +24,18 @@ import (
2124
"k8s.io/client-go/util/workqueue"
2225
)
2326

27+
const (
28+
// EventCreated indicates that a pod has been created.
29+
EventCreated = "CREATED"
30+
// EventDeleted indicates that a pod has been deleted.
31+
EventDeleted = "DELETED"
32+
)
33+
2434
// PodEvent represents a pod event to be processed.
2535
type PodEvent struct {
26-
Key string
27-
EventType string
36+
Key string
37+
EventType string
38+
DeletedPod *corev1.Pod // Only populated for delete events
2839
}
2940

3041
// Controller is the Kubernetes controller for tracking deployments.
@@ -34,6 +45,10 @@ type Controller struct {
3445
workqueue workqueue.TypedRateLimitingInterface[PodEvent]
3546
apiClient *deploymentrecord.Client
3647
cfg *Config
48+
// best effort cache to avoid redundant posts
49+
// post requests are idempotent, so if this cache fails due to
50+
// restarts or other events, nothing will break.
51+
observedDeployments sync.Map
3752
}
3853

3954
// New creates a new deployment tracker controller.
@@ -98,8 +113,9 @@ func New(clientset kubernetes.Interface, namespace string, cfg *Config) (*Contro
98113
return
99114
}
100115

101-
// Only process pods that are running
102-
if pod.Status.Phase == corev1.PodRunning {
116+
// Only process pods that are running and belong
117+
// to a deployment
118+
if pod.Status.Phase == corev1.PodRunning && getDeploymentName(pod) != "" {
103119
key, err := cache.MetaNamespaceKeyFunc(obj)
104120

105121
// For our purposes, there are in practice
@@ -108,7 +124,7 @@ func New(clientset kubernetes.Interface, namespace string, cfg *Config) (*Contro
108124
if err == nil {
109125
queue.Add(PodEvent{
110126
Key: key,
111-
EventType: "CREATED",
127+
EventType: EventCreated,
112128
})
113129
}
114130
}
@@ -129,8 +145,9 @@ func New(clientset kubernetes.Interface, namespace string, cfg *Config) (*Contro
129145
return
130146
}
131147

132-
// Skip if pod is being deleted
133-
if newPod.DeletionTimestamp != nil {
148+
// Skip if pod is being deleted or doesn't belong
149+
// to a deployment
150+
if newPod.DeletionTimestamp != nil || getDeploymentName(newPod) == "" {
134151
return
135152
}
136153

@@ -149,38 +166,43 @@ func New(clientset kubernetes.Interface, namespace string, cfg *Config) (*Contro
149166
if err == nil {
150167
queue.Add(PodEvent{
151168
Key: key,
152-
EventType: "CREATED",
169+
EventType: EventCreated,
153170
})
154171
}
155172
}
156173
},
157174
DeleteFunc: func(obj any) {
158-
_, ok := obj.(*corev1.Pod)
175+
pod, ok := obj.(*corev1.Pod)
159176
if !ok {
160177
// Handle deleted final state unknown
161178
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
162179
if !ok {
163180
return
164181
}
165-
_, ok = tombstone.Obj.(*corev1.Pod)
182+
pod, ok = tombstone.Obj.(*corev1.Pod)
166183
if !ok {
167184
return
168185
}
169186
}
170-
key, err := cache.MetaNamespaceKeyFunc(obj)
171187

188+
// Only process pods that belong to a deployment
189+
if getDeploymentName(pod) == "" {
190+
return
191+
}
192+
193+
key, err := cache.MetaNamespaceKeyFunc(obj)
172194
// For our purposes, there are in practice
173195
// no error event we care about, so don't
174196
// bother with handling it.
175197
if err == nil {
176198
queue.Add(PodEvent{
177-
Key: key,
178-
EventType: "DELETED",
199+
Key: key,
200+
EventType: EventDeleted,
201+
DeletedPod: pod,
179202
})
180203
}
181204
},
182205
})
183-
184206
if err != nil {
185207
return nil, fmt.Errorf("failed to add event handlers: %w", err)
186208
}
@@ -261,30 +283,63 @@ func (c *Controller) processNextItem(ctx context.Context) bool {
261283

262284
// processEvent processes a single pod event.
263285
func (c *Controller) processEvent(ctx context.Context, event PodEvent) error {
264-
// Get the pod from the informer's cache
265-
obj, exists, err := c.podInformer.GetIndexer().GetByKey(event.Key)
266-
if err != nil {
267-
slog.Error("Failed to get pod from cache",
268-
"key", event.Key,
269-
"error", err,
270-
)
271-
return nil
272-
}
273-
if !exists {
274-
// Pod no longer exists in cache, skip processing
275-
return nil
276-
}
286+
var pod *corev1.Pod
287+
288+
if event.EventType == EventDeleted {
289+
// For delete events, use the pod captured at deletion time
290+
pod = event.DeletedPod
291+
if pod == nil {
292+
slog.Error("Delete event missing pod data",
293+
"key", event.Key,
294+
)
295+
return nil
296+
}
277297

278-
pod, ok := obj.(*corev1.Pod)
279-
if !ok {
280-
slog.Error("Invalid object type in cache",
281-
"key", event.Key,
282-
)
283-
return nil
298+
// Check if the parent deployment still exists
299+
// If it does, this is just a scale-down event, skip it.
300+
//
301+
// If a deployment changes image versions, this will not
302+
// fire delete/decommissioned events to the remote API.
303+
// This is as intended, as the server will keep track of
304+
// the (cluster unique) deployment name, and just update
305+
// the referenced image digest to the newly observed (via
306+
// the create event).
307+
deploymentName := getDeploymentName(pod)
308+
if deploymentName != "" && c.deploymentExists(ctx, pod.Namespace, deploymentName) {
309+
slog.Debug("Deployment still exists, skipping pod delete (scale down)",
310+
"namespace", pod.Namespace,
311+
"deployment", deploymentName,
312+
"pod", pod.Name,
313+
)
314+
return nil
315+
}
316+
} else {
317+
// For create events, get the pod from the informer's cache
318+
obj, exists, err := c.podInformer.GetIndexer().GetByKey(event.Key)
319+
if err != nil {
320+
slog.Error("Failed to get pod from cache",
321+
"key", event.Key,
322+
"error", err,
323+
)
324+
return nil
325+
}
326+
if !exists {
327+
// Pod no longer exists in cache, skip processing
328+
return nil
329+
}
330+
331+
var ok bool
332+
pod, ok = obj.(*corev1.Pod)
333+
if !ok {
334+
slog.Error("Invalid object type in cache",
335+
"key", event.Key,
336+
)
337+
return nil
338+
}
284339
}
285340

286341
status := deploymentrecord.StatusDeployed
287-
if event.EventType == "DELETED" {
342+
if event.EventType == EventDeleted {
288343
status = deploymentrecord.StatusDecommissioned
289344
}
290345

@@ -307,6 +362,25 @@ func (c *Controller) processEvent(ctx context.Context, event PodEvent) error {
307362
return lastErr
308363
}
309364

365+
// deploymentExists checks if a deployment exists in the cluster.
366+
func (c *Controller) deploymentExists(ctx context.Context, namespace, name string) bool {
367+
_, err := c.clientset.AppsV1().Deployments(namespace).Get(ctx, name, metav1.GetOptions{})
368+
if err != nil {
369+
if k8serrors.IsNotFound(err) {
370+
return false
371+
}
372+
// On error, assume it exists to be safe
373+
// (avoid false decommissions)
374+
slog.Warn("Failed to check if deployment exists, assuming it does",
375+
"namespace", namespace,
376+
"deployment", name,
377+
"error", err,
378+
)
379+
return true
380+
}
381+
return true
382+
}
383+
310384
// recordContainer records a single container's deployment info.
311385
func (c *Controller) recordContainer(ctx context.Context, pod *corev1.Pod, container corev1.Container, status, eventType string) error {
312386
dn := getARDeploymentName(pod, container, c.cfg.Template)
@@ -323,6 +397,31 @@ func (c *Controller) recordContainer(ctx context.Context, pod *corev1.Pod, conta
323397
return nil
324398
}
325399

400+
cacheKey := getCacheKey(dn, digest)
401+
402+
// Check if we've already recorded this deployment
403+
switch status {
404+
case deploymentrecord.StatusDeployed:
405+
if _, exists := c.observedDeployments.Load(cacheKey); exists {
406+
slog.Debug("Deployment already observed, skipping post",
407+
"deployment_name", dn,
408+
"digest", digest,
409+
)
410+
return nil
411+
}
412+
case deploymentrecord.StatusDecommissioned:
413+
// For delete, check if we've seen it - if not, no need to decommission
414+
if _, exists := c.observedDeployments.Load(cacheKey); !exists {
415+
slog.Debug("Deployment not in cache, skipping decommission",
416+
"deployment_name", dn,
417+
"digest", digest,
418+
)
419+
return nil
420+
}
421+
default:
422+
return fmt.Errorf("invalid status: %s", status)
423+
}
424+
326425
// Extract image name and tag
327426
imageName, version := image.ExtractName(container.Image)
328427

@@ -372,9 +471,23 @@ func (c *Controller) recordContainer(ctx context.Context, pod *corev1.Pod, conta
372471
"digest", record.Digest,
373472
)
374473

474+
// Update cache after successful post
475+
switch status {
476+
case deploymentrecord.StatusDeployed:
477+
c.observedDeployments.Store(cacheKey, true)
478+
case deploymentrecord.StatusDecommissioned:
479+
c.observedDeployments.Delete(cacheKey)
480+
default:
481+
return fmt.Errorf("invalid status: %s", status)
482+
}
483+
375484
return nil
376485
}
377486

487+
func getCacheKey(dn, digest string) string {
488+
return dn + "||" + digest
489+
}
490+
378491
// getARDeploymentName converts the pod's metadata into the correct format
379492
// for the deployment name for the artifact registry (this is not the same
380493
// as the K8s deployment's name!

0 commit comments

Comments
 (0)