diff --git a/cmd/readiness-condition-reporter/main.go b/cmd/readiness-condition-reporter/main.go index 477cb8a..6293752 100644 --- a/cmd/readiness-condition-reporter/main.go +++ b/cmd/readiness-condition-reporter/main.go @@ -35,13 +35,15 @@ import ( ) const ( - envNodeName = "NODE_NAME" - envConditionType = "CONDITION_TYPE" - envCheckEndpoint = "CHECK_ENDPOINT" - envCheckInterval = "CHECK_INTERVAL" - envImpersonateNode = "IMPERSONATE_NODE" - defaultCheckInterval = 30 * time.Second - defaultHTTPTimeout = 10 * time.Second + envNodeName = "NODE_NAME" + envConditionType = "CONDITION_TYPE" + envCheckEndpoint = "CHECK_ENDPOINT" + envCheckInterval = "CHECK_INTERVAL" + envImpersonateNode = "IMPERSONATE_NODE" + envHeartbeatPeriod = "HEARTBEAT_PERIOD" + defaultCheckInterval = 30 * time.Second + defaultHTTPTimeout = 10 * time.Second + defaultHeartbeatPeriod = 5 * time.Minute ) // HealthResponse represents the health check response structure. @@ -86,6 +88,19 @@ func main() { } } + heartbeatPeriodStr := os.Getenv(envHeartbeatPeriod) + heartbeatPeriod := defaultHeartbeatPeriod + if heartbeatPeriodStr != "" { + parsedPeriod, err := time.ParseDuration(heartbeatPeriodStr) + if err == nil { + heartbeatPeriod = parsedPeriod + } else { + klog.ErrorS(err, "Failed parse heartbeat period, using default", + "input", heartbeatPeriodStr, + "default", defaultHeartbeatPeriod) + } + } + // Create Kubernetes client config, err := rest.InClusterConfig() if err != nil { @@ -121,20 +136,20 @@ func main() { defer ticker.Stop() // Run immediately on startup, then on each tick - runCheck(ctx, httpClient, clientset, checkEndpoint, nodeName, conditionType) + runCheck(ctx, httpClient, clientset, checkEndpoint, nodeName, conditionType, heartbeatPeriod) for { select { case <-ctx.Done(): klog.InfoS("Shutting down readiness condition reporter", "reason", ctx.Err()) return case <-ticker.C: - runCheck(ctx, httpClient, clientset, checkEndpoint, nodeName, conditionType) + runCheck(ctx, httpClient, clientset, checkEndpoint, nodeName, conditionType, heartbeatPeriod) } } } // runCheck performs a single health check and updates the node condition. -func runCheck(ctx context.Context, httpClient *http.Client, clientset kubernetes.Interface, checkEndpoint, nodeName, conditionType string) { +func runCheck(ctx context.Context, httpClient *http.Client, clientset kubernetes.Interface, checkEndpoint, nodeName, conditionType string, heartbeatPeriod time.Duration) { health, err := checkHealth(ctx, httpClient, checkEndpoint) if err != nil { klog.ErrorS(err, "Health check failed", "endpoint", checkEndpoint) @@ -145,7 +160,7 @@ func runCheck(ctx context.Context, httpClient *http.Client, clientset kubernetes } } - if err := updateNodeCondition(ctx, clientset, nodeName, conditionType, health); err != nil { + if err := updateNodeCondition(ctx, clientset, nodeName, conditionType, health, heartbeatPeriod); err != nil { klog.ErrorS(err, "Failed to update node condition", "node", nodeName, "condition", conditionType) } } @@ -198,7 +213,7 @@ func checkHealth(ctx context.Context, client *http.Client, endpoint string) (*He } // updateNodeCondition updates the node condition based on health check. -func updateNodeCondition(ctx context.Context, client kubernetes.Interface, nodeName, conditionType string, health *HealthResponse) error { +func updateNodeCondition(ctx context.Context, client kubernetes.Interface, nodeName, conditionType string, health *HealthResponse, heartbeatPeriod time.Duration) error { return retry.RetryOnConflict(retry.DefaultRetry, func() error { // Get the node node, err := client.CoreV1().Nodes().Get(ctx, nodeName, metav1.GetOptions{}) @@ -215,8 +230,12 @@ func updateNodeCondition(ctx context.Context, client kubernetes.Interface, nodeN // Find existing condition to preserve transition time if status hasn't changed var transitionTime metav1.Time + var existingCondition *corev1.NodeCondition + for _, condition := range node.Status.Conditions { if string(condition.Type) == conditionType { + condCopy := condition + existingCondition = &condCopy if condition.Status == status { transitionTime = condition.LastTransitionTime } @@ -224,6 +243,26 @@ func updateNodeCondition(ctx context.Context, client kubernetes.Interface, nodeN } } + // If the semantic state is completely unchanged, bypass the API write + // to prevent etcd write amplification and control plane flooding. + needsUpdate := true + if existingCondition != nil && existingCondition.Status == status && existingCondition.Reason == health.Reason && existingCondition.Message == health.Message { + needsUpdate = false + /* + NOTE: Skipping the write stops refreshing the LastHeartbeatTime on every tick. + To mitigate this, force an update every 5 minutes even if the state is unchanged. + */ + if time.Since(existingCondition.LastHeartbeatTime.Time) >= heartbeatPeriod { + needsUpdate = true + } + } + + if !needsUpdate { + // state has not changed for specified period, skip the write + klog.V(4).InfoS("Condition state unchanged, skipping node status update", "node", nodeName, "condition", conditionType) + return nil + } + if transitionTime.IsZero() { transitionTime = now } diff --git a/cmd/readiness-condition-reporter/main_test.go b/cmd/readiness-condition-reporter/main_test.go index 8706e86..a3d00ab 100644 --- a/cmd/readiness-condition-reporter/main_test.go +++ b/cmd/readiness-condition-reporter/main_test.go @@ -109,13 +109,16 @@ func TestCheckHealthCancelledContext(t *testing.T) { func TestUpdateNodeCondition(t *testing.T) { nodeName := "test-node" conditionType := "TestCondition" + staleTime := time.Now().Add(-6 * time.Minute) tests := []struct { - name string - existingNode *corev1.Node - health *HealthResponse - wantStatus corev1.ConditionStatus - wantReason string + name string + existingNode *corev1.Node + health *HealthResponse + heartbeatPeriod time.Duration + wantStatus corev1.ConditionStatus + wantReason string + wantUpdateCalled bool }{ { name: "New Condition Healthy", @@ -127,11 +130,13 @@ func TestUpdateNodeCondition(t *testing.T) { Reason: "EndpointOK", Message: "All good", }, - wantStatus: corev1.ConditionTrue, - wantReason: "EndpointOK", + heartbeatPeriod: 5 * time.Minute, + wantStatus: corev1.ConditionTrue, + wantReason: "EndpointOK", + wantUpdateCalled: true, }, { - name: "Update Condition to Unhealthy", + name: "State change triggers immediate write", existingNode: &corev1.Node{ ObjectMeta: metav1.ObjectMeta{Name: nodeName}, Status: corev1.NodeStatus{ @@ -148,8 +153,64 @@ func TestUpdateNodeCondition(t *testing.T) { Reason: "HealthCheckFailed", Message: "Something failed", }, - wantStatus: corev1.ConditionFalse, - wantReason: "HealthCheckFailed", + heartbeatPeriod: 5 * time.Minute, + wantStatus: corev1.ConditionFalse, + wantReason: "HealthCheckFailed", + wantUpdateCalled: true, + }, + { + name: "State unchanged: Fresh heartbeat (skip write)", + existingNode: &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{Name: nodeName}, + Status: corev1.NodeStatus{ + Conditions: []corev1.NodeCondition{ + { + Type: corev1.NodeConditionType(conditionType), + Status: corev1.ConditionTrue, + Reason: "EndpointOk", + Message: "All good", + LastHeartbeatTime: metav1.NewTime(time.Now()), + LastTransitionTime: metav1.NewTime(time.Now()), + }, + }, + }, + }, + health: &HealthResponse{ + Healthy: true, + Reason: "EndpointOk", + Message: "All good", + }, + heartbeatPeriod: 5 * time.Minute, + wantStatus: corev1.ConditionTrue, + wantReason: "EndpointOk", + wantUpdateCalled: false, + }, + { + name: "State unchanged: Stale heartbeat (force write)", + existingNode: &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{Name: nodeName}, + Status: corev1.NodeStatus{ + Conditions: []corev1.NodeCondition{ + { + Type: corev1.NodeConditionType(conditionType), + Status: corev1.ConditionTrue, + Reason: "EndpointOk", + Message: "All good", + LastHeartbeatTime: metav1.NewTime(staleTime), + LastTransitionTime: metav1.NewTime(staleTime), + }, + }, + }, + }, + health: &HealthResponse{ + Healthy: true, + Reason: "EndpointOk", + Message: "All good", + }, + heartbeatPeriod: 5 * time.Minute, + wantStatus: corev1.ConditionTrue, + wantReason: "EndpointOk", + wantUpdateCalled: true, }, } @@ -157,11 +218,29 @@ func TestUpdateNodeCondition(t *testing.T) { t.Run(tt.name, func(t *testing.T) { client := fake.NewSimpleClientset(tt.existingNode) - err := updateNodeCondition(context.Background(), client, nodeName, conditionType, tt.health) + countUpdates := func() int { + n := 0 + for _, a := range client.Actions() { + if a.GetVerb() == "update" && a.GetSubresource() == "status" && a.GetResource().Resource == "nodes" { + n++ + } + } + return n + } + + err := updateNodeCondition(context.Background(), client, nodeName, conditionType, tt.health, tt.heartbeatPeriod) if err != nil { t.Errorf("updateNodeCondition() error = %v", err) } + // Assert API call frequency + updateCount := countUpdates() + if tt.wantUpdateCalled && updateCount == 0 { + t.Errorf("Expected UpdateStatus to be called, but it was skipped") + } else if !tt.wantUpdateCalled && updateCount > 0 { + t.Errorf("Expected UpdateStatus to be skipped, but it was called %d times", updateCount) + } + updatedNode, err := client.CoreV1().Nodes().Get(context.Background(), nodeName, metav1.GetOptions{}) if err != nil { t.Fatalf("Failed to get node: %v", err)