Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 51 additions & 12 deletions cmd/readiness-condition-reporter/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,15 @@ import (
)

const (
envNodeName = "NODE_NAME"
envConditionType = "CONDITION_TYPE"
envCheckEndpoint = "CHECK_ENDPOINT"
envCheckInterval = "CHECK_INTERVAL"
envImpersonateNode = "IMPERSONATE_NODE"
defaultCheckInterval = 30 * time.Second
defaultHTTPTimeout = 10 * time.Second
envNodeName = "NODE_NAME"
envConditionType = "CONDITION_TYPE"
envCheckEndpoint = "CHECK_ENDPOINT"
envCheckInterval = "CHECK_INTERVAL"
envImpersonateNode = "IMPERSONATE_NODE"
envHeartbeatPeriod = "HEARTBEAT_PERIOD"
defaultCheckInterval = 30 * time.Second
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if the intent is to reduce the frequency of readiness updates, why it cannot be set with a different check interval?

Could we first clarify the concept of 'heart-beat'? Would there be a case when the component health check be checked often but not update the API server know if it degraded?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if the intent is to reduce the frequency of readiness updates, why it cannot be set with a different check interval?

A different check interval (in this case a larger one) would mean that if the node status changes we won't be able to detect it immediately. We need to check the node status frequently for fast detection, but if the state stays stable/healthy for a long time, we may want to skip writing that identical state to the API server on every tick. Skipping those redundant writes prevents etcd write amplification.

Could we first clarify the concept of 'heart-beat'?

The heartbeat here would be a liveness proof, It ensures that if the component stays perfectly healthy for a long time, we still write an update every 5 minutes just to bump the LastHeartbeatTime, proving to the API server that the reporter hasn't crashed.

Would there be a case when the component health check be checked often but not update the API server know if it degraded?

I do not think there is a case where a degraded state would be missed or delayed. If the health check degrades (Status, Reason, or Message changes), the idempotency gate would instantly open and the API server is updated on that exact tick.

defaultHTTPTimeout = 10 * time.Second
defaultHeartbeatPeriod = 5 * time.Minute
)

// HealthResponse represents the health check response structure.
Expand Down Expand Up @@ -86,6 +88,19 @@ func main() {
}
}

heartbeatPeriodStr := os.Getenv(envHeartbeatPeriod)
heartbeatPeriod := defaultHeartbeatPeriod
if heartbeatPeriodStr != "" {
parsedPeriod, err := time.ParseDuration(heartbeatPeriodStr)
if err == nil {
heartbeatPeriod = parsedPeriod
} else {
klog.ErrorS(err, "Failed parse heartbeat period, using default",
"input", heartbeatPeriodStr,
"default", defaultHeartbeatPeriod)
}
}

// Create Kubernetes client
config, err := rest.InClusterConfig()
if err != nil {
Expand Down Expand Up @@ -121,20 +136,20 @@ func main() {
defer ticker.Stop()

// Run immediately on startup, then on each tick
runCheck(ctx, httpClient, clientset, checkEndpoint, nodeName, conditionType)
runCheck(ctx, httpClient, clientset, checkEndpoint, nodeName, conditionType, heartbeatPeriod)
for {
select {
case <-ctx.Done():
klog.InfoS("Shutting down readiness condition reporter", "reason", ctx.Err())
return
case <-ticker.C:
runCheck(ctx, httpClient, clientset, checkEndpoint, nodeName, conditionType)
runCheck(ctx, httpClient, clientset, checkEndpoint, nodeName, conditionType, heartbeatPeriod)
}
}
}

// runCheck performs a single health check and updates the node condition.
func runCheck(ctx context.Context, httpClient *http.Client, clientset kubernetes.Interface, checkEndpoint, nodeName, conditionType string) {
func runCheck(ctx context.Context, httpClient *http.Client, clientset kubernetes.Interface, checkEndpoint, nodeName, conditionType string, heartbeatPeriod time.Duration) {
health, err := checkHealth(ctx, httpClient, checkEndpoint)
if err != nil {
klog.ErrorS(err, "Health check failed", "endpoint", checkEndpoint)
Expand All @@ -145,7 +160,7 @@ func runCheck(ctx context.Context, httpClient *http.Client, clientset kubernetes
}
}

if err := updateNodeCondition(ctx, clientset, nodeName, conditionType, health); err != nil {
if err := updateNodeCondition(ctx, clientset, nodeName, conditionType, health, heartbeatPeriod); err != nil {
klog.ErrorS(err, "Failed to update node condition", "node", nodeName, "condition", conditionType)
}
}
Expand Down Expand Up @@ -198,7 +213,7 @@ func checkHealth(ctx context.Context, client *http.Client, endpoint string) (*He
}

// updateNodeCondition updates the node condition based on health check.
func updateNodeCondition(ctx context.Context, client kubernetes.Interface, nodeName, conditionType string, health *HealthResponse) error {
func updateNodeCondition(ctx context.Context, client kubernetes.Interface, nodeName, conditionType string, health *HealthResponse, heartbeatPeriod time.Duration) error {
return retry.RetryOnConflict(retry.DefaultRetry, func() error {
// Get the node
node, err := client.CoreV1().Nodes().Get(ctx, nodeName, metav1.GetOptions{})
Expand All @@ -215,15 +230,39 @@ func updateNodeCondition(ctx context.Context, client kubernetes.Interface, nodeN

// Find existing condition to preserve transition time if status hasn't changed
var transitionTime metav1.Time
var existingCondition *corev1.NodeCondition

for _, condition := range node.Status.Conditions {
if string(condition.Type) == conditionType {
condCopy := condition
existingCondition = &condCopy
if condition.Status == status {
transitionTime = condition.LastTransitionTime
}
break
}
}

// If the semantic state is completely unchanged, bypass the API write
// to prevent etcd write amplification and control plane flooding.
needsUpdate := true
if existingCondition != nil && existingCondition.Status == status && existingCondition.Reason == health.Reason && existingCondition.Message == health.Message {
needsUpdate = false
/*
NOTE: Skipping the write stops refreshing the LastHeartbeatTime on every tick.
To mitigate this, force an update every 5 minutes even if the state is unchanged.
*/
if time.Since(existingCondition.LastHeartbeatTime.Time) >= heartbeatPeriod {
needsUpdate = true
}
}

if !needsUpdate {
// state has not changed for specified period, skip the write
klog.V(4).InfoS("Condition state unchanged, skipping node status update", "node", nodeName, "condition", conditionType)
return nil
Comment thread
LightCreator1007 marked this conversation as resolved.
Comment thread
LightCreator1007 marked this conversation as resolved.
}

if transitionTime.IsZero() {
transitionTime = now
}
Expand Down
101 changes: 90 additions & 11 deletions cmd/readiness-condition-reporter/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,13 +109,16 @@ func TestCheckHealthCancelledContext(t *testing.T) {
func TestUpdateNodeCondition(t *testing.T) {
nodeName := "test-node"
conditionType := "TestCondition"
staleTime := time.Now().Add(-6 * time.Minute)

tests := []struct {
name string
existingNode *corev1.Node
health *HealthResponse
wantStatus corev1.ConditionStatus
wantReason string
name string
existingNode *corev1.Node
health *HealthResponse
heartbeatPeriod time.Duration
wantStatus corev1.ConditionStatus
wantReason string
wantUpdateCalled bool
}{
{
name: "New Condition Healthy",
Expand All @@ -127,11 +130,13 @@ func TestUpdateNodeCondition(t *testing.T) {
Reason: "EndpointOK",
Message: "All good",
},
wantStatus: corev1.ConditionTrue,
wantReason: "EndpointOK",
heartbeatPeriod: 5 * time.Minute,
wantStatus: corev1.ConditionTrue,
wantReason: "EndpointOK",
wantUpdateCalled: true,
},
{
name: "Update Condition to Unhealthy",
name: "State change triggers immediate write",
existingNode: &corev1.Node{
ObjectMeta: metav1.ObjectMeta{Name: nodeName},
Status: corev1.NodeStatus{
Expand All @@ -148,20 +153,94 @@ func TestUpdateNodeCondition(t *testing.T) {
Reason: "HealthCheckFailed",
Message: "Something failed",
},
wantStatus: corev1.ConditionFalse,
wantReason: "HealthCheckFailed",
heartbeatPeriod: 5 * time.Minute,
wantStatus: corev1.ConditionFalse,
wantReason: "HealthCheckFailed",
wantUpdateCalled: true,
},
{
name: "State unchanged: Fresh heartbeat (skip write)",
existingNode: &corev1.Node{
ObjectMeta: metav1.ObjectMeta{Name: nodeName},
Status: corev1.NodeStatus{
Conditions: []corev1.NodeCondition{
{
Type: corev1.NodeConditionType(conditionType),
Status: corev1.ConditionTrue,
Reason: "EndpointOk",
Message: "All good",
LastHeartbeatTime: metav1.NewTime(time.Now()),
LastTransitionTime: metav1.NewTime(time.Now()),
},
},
},
},
health: &HealthResponse{
Healthy: true,
Reason: "EndpointOk",
Message: "All good",
},
heartbeatPeriod: 5 * time.Minute,
wantStatus: corev1.ConditionTrue,
wantReason: "EndpointOk",
wantUpdateCalled: false,
},
{
name: "State unchanged: Stale heartbeat (force write)",
existingNode: &corev1.Node{
ObjectMeta: metav1.ObjectMeta{Name: nodeName},
Status: corev1.NodeStatus{
Conditions: []corev1.NodeCondition{
{
Type: corev1.NodeConditionType(conditionType),
Status: corev1.ConditionTrue,
Reason: "EndpointOk",
Message: "All good",
LastHeartbeatTime: metav1.NewTime(staleTime),
LastTransitionTime: metav1.NewTime(staleTime),
},
},
},
},
health: &HealthResponse{
Healthy: true,
Reason: "EndpointOk",
Message: "All good",
},
heartbeatPeriod: 5 * time.Minute,
wantStatus: corev1.ConditionTrue,
wantReason: "EndpointOk",
wantUpdateCalled: true,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
client := fake.NewSimpleClientset(tt.existingNode)

err := updateNodeCondition(context.Background(), client, nodeName, conditionType, tt.health)
countUpdates := func() int {
n := 0
for _, a := range client.Actions() {
if a.GetVerb() == "update" && a.GetSubresource() == "status" && a.GetResource().Resource == "nodes" {
n++
}
}
return n
}

err := updateNodeCondition(context.Background(), client, nodeName, conditionType, tt.health, tt.heartbeatPeriod)
if err != nil {
t.Errorf("updateNodeCondition() error = %v", err)
}

// Assert API call frequency
updateCount := countUpdates()
if tt.wantUpdateCalled && updateCount == 0 {
t.Errorf("Expected UpdateStatus to be called, but it was skipped")
} else if !tt.wantUpdateCalled && updateCount > 0 {
t.Errorf("Expected UpdateStatus to be skipped, but it was called %d times", updateCount)
}

updatedNode, err := client.CoreV1().Nodes().Get(context.Background(), nodeName, metav1.GetOptions{})
if err != nil {
t.Fatalf("Failed to get node: %v", err)
Expand Down