From 0eb7ec2be17e15692ff40a7e71ef09e43b4271fa Mon Sep 17 00:00:00 2001 From: LightCreator1007 Date: Wed, 20 May 2026 19:57:01 +0530 Subject: [PATCH 1/6] fix(reporter):added idempotency gate to prevent API server flooding --- cmd/readiness-condition-reporter/main.go | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/cmd/readiness-condition-reporter/main.go b/cmd/readiness-condition-reporter/main.go index 477cb8a..931ca16 100644 --- a/cmd/readiness-condition-reporter/main.go +++ b/cmd/readiness-condition-reporter/main.go @@ -215,8 +215,12 @@ func updateNodeCondition(ctx context.Context, client kubernetes.Interface, nodeN // Find existing condition to preserve transition time if status hasn't changed var transitionTime metav1.Time + var existingCondition *corev1.NodeCondition + for _, condition := range node.Status.Conditions { if string(condition.Type) == conditionType { + condCopy := condition + existingCondition = &condCopy if condition.Status == status { transitionTime = condition.LastTransitionTime } @@ -224,6 +228,13 @@ func updateNodeCondition(ctx context.Context, client kubernetes.Interface, nodeN } } + // If the semantic state is completely unchanged, bypass the API write + // to prevent etcd write amplification and control plane flooding. + if existingCondition != nil && existingCondition.Status == status && existingCondition.Reason == health.Reason && existingCondition.Message == health.Message { + // since state did not change return early to avoid unnecessary API call + return nil + } + if transitionTime.IsZero() { transitionTime = now } From 80cc5f56129ea3f19a2249626f857e55109ff979 Mon Sep 17 00:00:00 2001 From: LightCreator1007 Date: Thu, 21 May 2026 15:34:17 +0530 Subject: [PATCH 2/6] updated idempotency gate to include 5min heartbeat --- cmd/readiness-condition-reporter/main.go | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/cmd/readiness-condition-reporter/main.go b/cmd/readiness-condition-reporter/main.go index 931ca16..009cc14 100644 --- a/cmd/readiness-condition-reporter/main.go +++ b/cmd/readiness-condition-reporter/main.go @@ -230,8 +230,20 @@ func updateNodeCondition(ctx context.Context, client kubernetes.Interface, nodeN // If the semantic state is completely unchanged, bypass the API write // to prevent etcd write amplification and control plane flooding. + needsUpdate := true if existingCondition != nil && existingCondition.Status == status && existingCondition.Reason == health.Reason && existingCondition.Message == health.Message { - // since state did not change return early to avoid unnecessary API call + needsUpdate = false + /* + NOTE: Skipping the write stops refreshing the LastHeartbeatTime on every tick. + To mitigate this, force an update every 5 minutes even if the state is unchanged. + */ + if time.Since(existingCondition.LastHeartbeatTime.Time) >= 5*time.Minute { + needsUpdate = true + } + } + + if !needsUpdate { + //state has not changed for 5 mins, skip the write return nil } From 6bea06114558e018567ebcf646cd4c53ed5f8c69 Mon Sep 17 00:00:00 2001 From: LightCreator1007 Date: Thu, 21 May 2026 15:42:00 +0530 Subject: [PATCH 3/6] test: add idempotency and heartbeat coverage for status updates --- cmd/readiness-condition-reporter/main_test.go | 98 +++++++++++++++++++ 1 file changed, 98 insertions(+) diff --git a/cmd/readiness-condition-reporter/main_test.go b/cmd/readiness-condition-reporter/main_test.go index 8706e86..90b0781 100644 --- a/cmd/readiness-condition-reporter/main_test.go +++ b/cmd/readiness-condition-reporter/main_test.go @@ -188,3 +188,101 @@ func TestUpdateNodeCondition(t *testing.T) { }) } } + +func TestUpdateNodeCondition_SkipsWriteWhenStateUnchanged(t *testing.T) { + nodeName := "test-node" + conditionType := "TestCondition" + health := &HealthResponse{Healthy: true, Reason: "EndpointOK", Message: "All good"} + + countUpdates := func(c *fake.Clientset) int { + n := 0 + for _, a := range c.Actions() { + if a.GetVerb() == "update" && a.GetSubresource() == "status" && a.GetResource().Resource == "nodes" { + n++ + } + } + return n + } + + // Part 1: Fresh heartbeat, state unchanged -> skip the API write. + node := &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{Name: nodeName}, + Status: corev1.NodeStatus{ + Conditions: []corev1.NodeCondition{ + { + Type: corev1.NodeConditionType(conditionType), + Status: corev1.ConditionTrue, + Reason: "EndpointOK", + Message: "All good", + LastHeartbeatTime: metav1.NewTime(time.Now()), + LastTransitionTime: metav1.NewTime(time.Now()), + }, + }, + }, + } + client := fake.NewSimpleClientset(node) + + if err := updateNodeCondition(context.Background(), client, nodeName, conditionType, health); err != nil { + t.Fatalf("fresh heartbeat: updateNodeCondition() error = %v", err) + } + + if got := countUpdates(client); got != 0 { + t.Errorf("fresh heartbeat: expected 0 UpdateStatus calls (skip), got %d", got) + } + + fetchedNode, err := client.CoreV1().Nodes().Get(context.Background(), nodeName, metav1.GetOptions{}) + if err != nil { + t.Fatalf("fresh heartbeat: failed to fetch node: %v", err) + } + for _, c := range fetchedNode.Status.Conditions { + if string(c.Type) == conditionType { + if c.LastHeartbeatTime.Time != node.Status.Conditions[0].LastHeartbeatTime.Time { + t.Errorf("fresh heartbeat: heartbeat was mutated despite skip; got %v, want %v", + c.LastHeartbeatTime.Time, node.Status.Conditions[0].LastHeartbeatTime.Time) + } + break + } + } + + // Part 2: Stale heartbeat (>=5min), state unchanged -> force the API write. + staleTime := time.Now().Add(-6 * time.Minute) + staleNode := &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{Name: nodeName}, + Status: corev1.NodeStatus{ + Conditions: []corev1.NodeCondition{ + { + Type: corev1.NodeConditionType(conditionType), + Status: corev1.ConditionTrue, + Reason: "EndpointOK", + Message: "All good", + LastHeartbeatTime: metav1.NewTime(staleTime), + LastTransitionTime: metav1.NewTime(staleTime), + }, + }, + }, + } + staleClient := fake.NewSimpleClientset(staleNode) + + if err := updateNodeCondition(context.Background(), staleClient, nodeName, conditionType, health); err != nil { + t.Fatalf("stale heartbeat: updateNodeCondition() error = %v", err) + } + + if got := countUpdates(staleClient); got != 1 { + t.Errorf("stale heartbeat: expected 1 UpdateStatus call (forced refresh), got %d", got) + } + + refreshedNode, err := staleClient.CoreV1().Nodes().Get(context.Background(), nodeName, metav1.GetOptions{}) + if err != nil { + t.Fatalf("stale heartbeat: failed to fetch node: %v", err) + } + for _, c := range refreshedNode.Status.Conditions { + if string(c.Type) == conditionType { + if !c.LastHeartbeatTime.After(staleTime) { + t.Errorf("stale heartbeat: expected heartbeat refreshed past %v, got %v", + staleTime, c.LastHeartbeatTime.Time) + } + return + } + } + t.Fatal("stale heartbeat: condition not found after forced refresh") +} From ea637a9db838d99233244e34413b6379cb61a6fc Mon Sep 17 00:00:00 2001 From: LightCreator1007 Date: Thu, 21 May 2026 15:44:29 +0530 Subject: [PATCH 4/6] fixed linting errors --- cmd/readiness-condition-reporter/main.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/readiness-condition-reporter/main.go b/cmd/readiness-condition-reporter/main.go index 009cc14..fd9fa1d 100644 --- a/cmd/readiness-condition-reporter/main.go +++ b/cmd/readiness-condition-reporter/main.go @@ -243,7 +243,7 @@ func updateNodeCondition(ctx context.Context, client kubernetes.Interface, nodeN } if !needsUpdate { - //state has not changed for 5 mins, skip the write + // state has not changed for 5 mins, skip the write return nil } From 4669574e64c026295beb396410fd0010d45faada Mon Sep 17 00:00:00 2001 From: LightCreator1007 Date: Mon, 25 May 2026 23:07:38 +0530 Subject: [PATCH 5/6] addedconfigurable heartbeat period and logging in case of skip scenario --- cmd/readiness-condition-reporter/main.go | 44 ++++++++++++++++-------- 1 file changed, 30 insertions(+), 14 deletions(-) diff --git a/cmd/readiness-condition-reporter/main.go b/cmd/readiness-condition-reporter/main.go index fd9fa1d..6293752 100644 --- a/cmd/readiness-condition-reporter/main.go +++ b/cmd/readiness-condition-reporter/main.go @@ -35,13 +35,15 @@ import ( ) const ( - envNodeName = "NODE_NAME" - envConditionType = "CONDITION_TYPE" - envCheckEndpoint = "CHECK_ENDPOINT" - envCheckInterval = "CHECK_INTERVAL" - envImpersonateNode = "IMPERSONATE_NODE" - defaultCheckInterval = 30 * time.Second - defaultHTTPTimeout = 10 * time.Second + envNodeName = "NODE_NAME" + envConditionType = "CONDITION_TYPE" + envCheckEndpoint = "CHECK_ENDPOINT" + envCheckInterval = "CHECK_INTERVAL" + envImpersonateNode = "IMPERSONATE_NODE" + envHeartbeatPeriod = "HEARTBEAT_PERIOD" + defaultCheckInterval = 30 * time.Second + defaultHTTPTimeout = 10 * time.Second + defaultHeartbeatPeriod = 5 * time.Minute ) // HealthResponse represents the health check response structure. @@ -86,6 +88,19 @@ func main() { } } + heartbeatPeriodStr := os.Getenv(envHeartbeatPeriod) + heartbeatPeriod := defaultHeartbeatPeriod + if heartbeatPeriodStr != "" { + parsedPeriod, err := time.ParseDuration(heartbeatPeriodStr) + if err == nil { + heartbeatPeriod = parsedPeriod + } else { + klog.ErrorS(err, "Failed parse heartbeat period, using default", + "input", heartbeatPeriodStr, + "default", defaultHeartbeatPeriod) + } + } + // Create Kubernetes client config, err := rest.InClusterConfig() if err != nil { @@ -121,20 +136,20 @@ func main() { defer ticker.Stop() // Run immediately on startup, then on each tick - runCheck(ctx, httpClient, clientset, checkEndpoint, nodeName, conditionType) + runCheck(ctx, httpClient, clientset, checkEndpoint, nodeName, conditionType, heartbeatPeriod) for { select { case <-ctx.Done(): klog.InfoS("Shutting down readiness condition reporter", "reason", ctx.Err()) return case <-ticker.C: - runCheck(ctx, httpClient, clientset, checkEndpoint, nodeName, conditionType) + runCheck(ctx, httpClient, clientset, checkEndpoint, nodeName, conditionType, heartbeatPeriod) } } } // runCheck performs a single health check and updates the node condition. -func runCheck(ctx context.Context, httpClient *http.Client, clientset kubernetes.Interface, checkEndpoint, nodeName, conditionType string) { +func runCheck(ctx context.Context, httpClient *http.Client, clientset kubernetes.Interface, checkEndpoint, nodeName, conditionType string, heartbeatPeriod time.Duration) { health, err := checkHealth(ctx, httpClient, checkEndpoint) if err != nil { klog.ErrorS(err, "Health check failed", "endpoint", checkEndpoint) @@ -145,7 +160,7 @@ func runCheck(ctx context.Context, httpClient *http.Client, clientset kubernetes } } - if err := updateNodeCondition(ctx, clientset, nodeName, conditionType, health); err != nil { + if err := updateNodeCondition(ctx, clientset, nodeName, conditionType, health, heartbeatPeriod); err != nil { klog.ErrorS(err, "Failed to update node condition", "node", nodeName, "condition", conditionType) } } @@ -198,7 +213,7 @@ func checkHealth(ctx context.Context, client *http.Client, endpoint string) (*He } // updateNodeCondition updates the node condition based on health check. -func updateNodeCondition(ctx context.Context, client kubernetes.Interface, nodeName, conditionType string, health *HealthResponse) error { +func updateNodeCondition(ctx context.Context, client kubernetes.Interface, nodeName, conditionType string, health *HealthResponse, heartbeatPeriod time.Duration) error { return retry.RetryOnConflict(retry.DefaultRetry, func() error { // Get the node node, err := client.CoreV1().Nodes().Get(ctx, nodeName, metav1.GetOptions{}) @@ -237,13 +252,14 @@ func updateNodeCondition(ctx context.Context, client kubernetes.Interface, nodeN NOTE: Skipping the write stops refreshing the LastHeartbeatTime on every tick. To mitigate this, force an update every 5 minutes even if the state is unchanged. */ - if time.Since(existingCondition.LastHeartbeatTime.Time) >= 5*time.Minute { + if time.Since(existingCondition.LastHeartbeatTime.Time) >= heartbeatPeriod { needsUpdate = true } } if !needsUpdate { - // state has not changed for 5 mins, skip the write + // state has not changed for specified period, skip the write + klog.V(4).InfoS("Condition state unchanged, skipping node status update", "node", nodeName, "condition", conditionType) return nil } From cb80ba271ebd538a6a690e4a1641a1279c4c6978 Mon Sep 17 00:00:00 2001 From: LightCreator1007 Date: Mon, 25 May 2026 23:08:50 +0530 Subject: [PATCH 6/6] test: extended the existing TestUpdateNodeCondition to cover the new cases --- cmd/readiness-condition-reporter/main_test.go | 199 ++++++++---------- 1 file changed, 90 insertions(+), 109 deletions(-) diff --git a/cmd/readiness-condition-reporter/main_test.go b/cmd/readiness-condition-reporter/main_test.go index 90b0781..a3d00ab 100644 --- a/cmd/readiness-condition-reporter/main_test.go +++ b/cmd/readiness-condition-reporter/main_test.go @@ -109,13 +109,16 @@ func TestCheckHealthCancelledContext(t *testing.T) { func TestUpdateNodeCondition(t *testing.T) { nodeName := "test-node" conditionType := "TestCondition" + staleTime := time.Now().Add(-6 * time.Minute) tests := []struct { - name string - existingNode *corev1.Node - health *HealthResponse - wantStatus corev1.ConditionStatus - wantReason string + name string + existingNode *corev1.Node + health *HealthResponse + heartbeatPeriod time.Duration + wantStatus corev1.ConditionStatus + wantReason string + wantUpdateCalled bool }{ { name: "New Condition Healthy", @@ -127,11 +130,13 @@ func TestUpdateNodeCondition(t *testing.T) { Reason: "EndpointOK", Message: "All good", }, - wantStatus: corev1.ConditionTrue, - wantReason: "EndpointOK", + heartbeatPeriod: 5 * time.Minute, + wantStatus: corev1.ConditionTrue, + wantReason: "EndpointOK", + wantUpdateCalled: true, }, { - name: "Update Condition to Unhealthy", + name: "State change triggers immediate write", existingNode: &corev1.Node{ ObjectMeta: metav1.ObjectMeta{Name: nodeName}, Status: corev1.NodeStatus{ @@ -148,8 +153,64 @@ func TestUpdateNodeCondition(t *testing.T) { Reason: "HealthCheckFailed", Message: "Something failed", }, - wantStatus: corev1.ConditionFalse, - wantReason: "HealthCheckFailed", + heartbeatPeriod: 5 * time.Minute, + wantStatus: corev1.ConditionFalse, + wantReason: "HealthCheckFailed", + wantUpdateCalled: true, + }, + { + name: "State unchanged: Fresh heartbeat (skip write)", + existingNode: &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{Name: nodeName}, + Status: corev1.NodeStatus{ + Conditions: []corev1.NodeCondition{ + { + Type: corev1.NodeConditionType(conditionType), + Status: corev1.ConditionTrue, + Reason: "EndpointOk", + Message: "All good", + LastHeartbeatTime: metav1.NewTime(time.Now()), + LastTransitionTime: metav1.NewTime(time.Now()), + }, + }, + }, + }, + health: &HealthResponse{ + Healthy: true, + Reason: "EndpointOk", + Message: "All good", + }, + heartbeatPeriod: 5 * time.Minute, + wantStatus: corev1.ConditionTrue, + wantReason: "EndpointOk", + wantUpdateCalled: false, + }, + { + name: "State unchanged: Stale heartbeat (force write)", + existingNode: &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{Name: nodeName}, + Status: corev1.NodeStatus{ + Conditions: []corev1.NodeCondition{ + { + Type: corev1.NodeConditionType(conditionType), + Status: corev1.ConditionTrue, + Reason: "EndpointOk", + Message: "All good", + LastHeartbeatTime: metav1.NewTime(staleTime), + LastTransitionTime: metav1.NewTime(staleTime), + }, + }, + }, + }, + health: &HealthResponse{ + Healthy: true, + Reason: "EndpointOk", + Message: "All good", + }, + heartbeatPeriod: 5 * time.Minute, + wantStatus: corev1.ConditionTrue, + wantReason: "EndpointOk", + wantUpdateCalled: true, }, } @@ -157,11 +218,29 @@ func TestUpdateNodeCondition(t *testing.T) { t.Run(tt.name, func(t *testing.T) { client := fake.NewSimpleClientset(tt.existingNode) - err := updateNodeCondition(context.Background(), client, nodeName, conditionType, tt.health) + countUpdates := func() int { + n := 0 + for _, a := range client.Actions() { + if a.GetVerb() == "update" && a.GetSubresource() == "status" && a.GetResource().Resource == "nodes" { + n++ + } + } + return n + } + + err := updateNodeCondition(context.Background(), client, nodeName, conditionType, tt.health, tt.heartbeatPeriod) if err != nil { t.Errorf("updateNodeCondition() error = %v", err) } + // Assert API call frequency + updateCount := countUpdates() + if tt.wantUpdateCalled && updateCount == 0 { + t.Errorf("Expected UpdateStatus to be called, but it was skipped") + } else if !tt.wantUpdateCalled && updateCount > 0 { + t.Errorf("Expected UpdateStatus to be skipped, but it was called %d times", updateCount) + } + updatedNode, err := client.CoreV1().Nodes().Get(context.Background(), nodeName, metav1.GetOptions{}) if err != nil { t.Fatalf("Failed to get node: %v", err) @@ -188,101 +267,3 @@ func TestUpdateNodeCondition(t *testing.T) { }) } } - -func TestUpdateNodeCondition_SkipsWriteWhenStateUnchanged(t *testing.T) { - nodeName := "test-node" - conditionType := "TestCondition" - health := &HealthResponse{Healthy: true, Reason: "EndpointOK", Message: "All good"} - - countUpdates := func(c *fake.Clientset) int { - n := 0 - for _, a := range c.Actions() { - if a.GetVerb() == "update" && a.GetSubresource() == "status" && a.GetResource().Resource == "nodes" { - n++ - } - } - return n - } - - // Part 1: Fresh heartbeat, state unchanged -> skip the API write. - node := &corev1.Node{ - ObjectMeta: metav1.ObjectMeta{Name: nodeName}, - Status: corev1.NodeStatus{ - Conditions: []corev1.NodeCondition{ - { - Type: corev1.NodeConditionType(conditionType), - Status: corev1.ConditionTrue, - Reason: "EndpointOK", - Message: "All good", - LastHeartbeatTime: metav1.NewTime(time.Now()), - LastTransitionTime: metav1.NewTime(time.Now()), - }, - }, - }, - } - client := fake.NewSimpleClientset(node) - - if err := updateNodeCondition(context.Background(), client, nodeName, conditionType, health); err != nil { - t.Fatalf("fresh heartbeat: updateNodeCondition() error = %v", err) - } - - if got := countUpdates(client); got != 0 { - t.Errorf("fresh heartbeat: expected 0 UpdateStatus calls (skip), got %d", got) - } - - fetchedNode, err := client.CoreV1().Nodes().Get(context.Background(), nodeName, metav1.GetOptions{}) - if err != nil { - t.Fatalf("fresh heartbeat: failed to fetch node: %v", err) - } - for _, c := range fetchedNode.Status.Conditions { - if string(c.Type) == conditionType { - if c.LastHeartbeatTime.Time != node.Status.Conditions[0].LastHeartbeatTime.Time { - t.Errorf("fresh heartbeat: heartbeat was mutated despite skip; got %v, want %v", - c.LastHeartbeatTime.Time, node.Status.Conditions[0].LastHeartbeatTime.Time) - } - break - } - } - - // Part 2: Stale heartbeat (>=5min), state unchanged -> force the API write. - staleTime := time.Now().Add(-6 * time.Minute) - staleNode := &corev1.Node{ - ObjectMeta: metav1.ObjectMeta{Name: nodeName}, - Status: corev1.NodeStatus{ - Conditions: []corev1.NodeCondition{ - { - Type: corev1.NodeConditionType(conditionType), - Status: corev1.ConditionTrue, - Reason: "EndpointOK", - Message: "All good", - LastHeartbeatTime: metav1.NewTime(staleTime), - LastTransitionTime: metav1.NewTime(staleTime), - }, - }, - }, - } - staleClient := fake.NewSimpleClientset(staleNode) - - if err := updateNodeCondition(context.Background(), staleClient, nodeName, conditionType, health); err != nil { - t.Fatalf("stale heartbeat: updateNodeCondition() error = %v", err) - } - - if got := countUpdates(staleClient); got != 1 { - t.Errorf("stale heartbeat: expected 1 UpdateStatus call (forced refresh), got %d", got) - } - - refreshedNode, err := staleClient.CoreV1().Nodes().Get(context.Background(), nodeName, metav1.GetOptions{}) - if err != nil { - t.Fatalf("stale heartbeat: failed to fetch node: %v", err) - } - for _, c := range refreshedNode.Status.Conditions { - if string(c.Type) == conditionType { - if !c.LastHeartbeatTime.After(staleTime) { - t.Errorf("stale heartbeat: expected heartbeat refreshed past %v, got %v", - staleTime, c.LastHeartbeatTime.Time) - } - return - } - } - t.Fatal("stale heartbeat: condition not found after forced refresh") -}