From b65cd2bcc487a45f1aa9619fc65180eae49285a8 Mon Sep 17 00:00:00 2001 From: andrefun Date: Sat, 25 Apr 2026 13:30:18 +0800 Subject: [PATCH 1/2] Update core.lua fix(discovery/kubernetes): preserve cached endpoints when payload is empty --- apisix/discovery/kubernetes/core.lua | 77 ++++++++++++++++++++++++++++ 1 file changed, 77 insertions(+) diff --git a/apisix/discovery/kubernetes/core.lua b/apisix/discovery/kubernetes/core.lua index 1ba74408d9e2..5f56fa53d80b 100644 --- a/apisix/discovery/kubernetes/core.lua +++ b/apisix/discovery/kubernetes/core.lua @@ -346,6 +346,20 @@ function _M.create_endpoint_callbacks(options) end core.log.debug("get endpoint_slice: ", core.json.delay_encode(endpoint_slice)) + + -- Defensive logging: capture transient empty/all-not-ready states for + -- diagnosis (see on_endpoint_modified for the full rationale). + local slice_eps_for_log = endpoint_slice.endpoints + if slice_eps_for_log == ngx.null then + slice_eps_for_log = nil + end + if not slice_eps_for_log or #slice_eps_for_log == 0 then + core.log.warn("kubernetes discovery: endpoint_slice has no endpoints, ", + "namespace=", endpoint_slice.metadata.namespace, + ", name=", endpoint_slice.metadata.name, + ", svc=", endpoint_slice.metadata.labels[kubernetes_service_name_label]) + end + local port_to_nodes = {} local slice_endpoints = endpoint_slice.endpoints @@ -394,6 +408,25 @@ function _M.create_endpoint_callbacks(options) handle, endpoint_key, port_to_nodes, endpoint_slice.metadata.name) local cached_endpoints = get_endpoints_from_cache(handle, endpoint_key) + + -- Guard: if the merged result across all cached slices is empty (no + -- ports / no nodes), skip the dict write so that a transient + -- "all not-ready" state does not invalidate the existing routable + -- endpoint list. The legitimate "service has no endpoints" state is + -- handled by on_endpoint_slices_deleted, which actually removes the + -- key. Active health checks remain responsible for evicting truly + -- dead nodes that are still in the dict. + if not next(cached_endpoints) then + core.log.warn("kubernetes discovery: skip empty endpoint update for ", + endpoint_key, " (likely a transient k8s reconcile state); ", + "preserving previous endpoints") + if operate == "list" then + handle.current_keys_hash[endpoint_key] = true + handle.current_keys_hash[endpoint_key .. "#version"] = true + end + return + end + for _, nodes in pairs(cached_endpoints) do core.table.sort(nodes, sort_nodes_cmp) end @@ -454,6 +487,29 @@ function _M.create_endpoint_callbacks(options) end core.log.debug(core.json.delay_encode(endpoint)) + + -- Defensive logging: capture transient "all not-ready" states. + -- Kubernetes endpoints controller may briefly publish an Endpoints + -- object whose `subsets[*].addresses` is empty while pods are + -- terminating / readiness-flapping; the surrounding scope still has + -- subsets but with notReadyAddresses only. The downstream loop only + -- considers subset.addresses, so the resulting endpoint_buffer ends + -- up empty even though the upstream still has live pods. + local subsets_for_log = endpoint.subsets + local has_ready_address = false + for _, subset in ipairs(subsets_for_log or {}) do + if subset.addresses and #subset.addresses > 0 then + has_ready_address = true + break + end + end + if not has_ready_address then + core.log.warn("kubernetes discovery: endpoint has no ready addresses, ", + "namespace=", endpoint.metadata.namespace, + ", name=", endpoint.metadata.name, + ", subsets=", #(subsets_for_log or {})) + end + core.table.clear(endpoint_buffer) local subsets = endpoint.subsets @@ -497,6 +553,27 @@ function _M.create_endpoint_callbacks(options) local endpoint_key = build_endpoint_key( key_prefix, endpoint.metadata.namespace, endpoint.metadata.name) + + -- Guard: if no port has any node (typically because subset.addresses + -- was empty for every subset), do not overwrite the dict with an + -- empty payload. An empty payload would cause subsequent requests + -- to receive `no valid upstream node: nil` (HTTP 503) until the next + -- update arrives, even though the upstream may still be servable + -- via the previously-cached nodes. The legitimate "service is gone" + -- state is handled by on_endpoint_deleted, which actually removes + -- the key. Active health checks remain responsible for evicting + -- truly dead nodes that are still in the dict. + if not next(endpoint_buffer) then + core.log.warn("kubernetes discovery: skip empty endpoint update for ", + endpoint_key, " (likely a transient k8s reconcile state); ", + "preserving previous endpoints") + if operate == "list" then + handle.current_keys_hash[endpoint_key] = true + handle.current_keys_hash[endpoint_key .. "#version"] = true + end + return + end + local ok, err = _M.update_endpoint_dict(handle, endpoint_buffer, endpoint_key) if not ok then core.log.error("failed to update endpoint dict for endpoint: ", endpoint_key, From 3c1bf5183bd907c45af300ed744067ce9791796d Mon Sep 17 00:00:00 2001 From: andrefun Date: Sat, 25 Apr 2026 13:33:42 +0800 Subject: [PATCH 2/2] Create kubernetes_empty_endpoint.t test: add unit test for empty endpoint guard --- t/discovery/kubernetes_empty_endpoint.t | 191 ++++++++++++++++++++++++ 1 file changed, 191 insertions(+) create mode 100644 t/discovery/kubernetes_empty_endpoint.t diff --git a/t/discovery/kubernetes_empty_endpoint.t b/t/discovery/kubernetes_empty_endpoint.t new file mode 100644 index 000000000000..2fc3b5ac9dc1 --- /dev/null +++ b/t/discovery/kubernetes_empty_endpoint.t @@ -0,0 +1,191 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +use t::APISIX 'no_plan'; + +repeat_each(1); +no_long_string(); +no_root_location(); +log_level("warn"); + +add_block_preprocessor(sub { + my ($block) = @_; + + if (!$block->request) { + $block->set_value("request", "GET /t"); + } + + if (!$block->error_log && !$block->no_error_log) { + $block->set_value("no_error_log", "[error]\n[alert]"); + } +}); + +run_tests; + +__DATA__ + +=== TEST 1: on_endpoint_modified preserves previous endpoints when payload is empty +--- config + location /t { + content_by_lua_block { + local k8s_core = require("apisix.discovery.kubernetes.core") + local cbs = k8s_core.create_endpoint_callbacks({}) + + local handle = { + endpoint_dict = ngx.shared["kubernetes"], + default_weight = 50, + namespace_selector = nil, + endpoint_slices_cache = {}, + current_keys_hash = {}, + } + + -- 1) seed with a normal endpoint + local normal = { + metadata = { namespace = "default", name = "svc1" }, + subsets = { + { + addresses = { { ip = "10.0.0.1" } }, + ports = { { name = "http", port = 80 } }, + }, + }, + } + cbs.on_endpoint_modified(handle, normal) + local v1 = handle.endpoint_dict:get("default/svc1#version") + local c1 = handle.endpoint_dict:get("default/svc1") + assert(v1 ~= nil, "version should be set after first update") + assert(c1 and c1:find("10.0.0.1"), "content should contain 10.0.0.1") + ngx.say("seeded version=", tostring(v1 ~= nil), " content_has_ip=", tostring(c1:find("10.0.0.1") ~= nil)) + + -- 2) deliver a "transient empty" event: subsets exist but addresses is nil + local transient_empty = { + metadata = { namespace = "default", name = "svc1" }, + subsets = { + { + notReadyAddresses = { { ip = "10.0.0.1" } }, + ports = { { name = "http", port = 80 } }, + }, + }, + } + cbs.on_endpoint_modified(handle, transient_empty) + + -- 3) the previous endpoint must still be present (this is the bug fix) + local v2 = handle.endpoint_dict:get("default/svc1#version") + local c2 = handle.endpoint_dict:get("default/svc1") + assert(v2 == v1, "version must be unchanged after empty update; got " .. tostring(v2)) + assert(c2 == c1, "content must be unchanged after empty update") + ngx.say("after empty: version_unchanged=", tostring(v2 == v1), + " content_unchanged=", tostring(c2 == c1)) + + -- 4) deliver an entirely empty subsets payload too + local subsets_empty = { + metadata = { namespace = "default", name = "svc1" }, + subsets = {}, + } + cbs.on_endpoint_modified(handle, subsets_empty) + local v3 = handle.endpoint_dict:get("default/svc1#version") + assert(v3 == v1, "version must be unchanged after subsets=[] update") + ngx.say("after subsets=[]: version_unchanged=", tostring(v3 == v1)) + + -- 5) a real new endpoint event still updates as expected + local new = { + metadata = { namespace = "default", name = "svc1" }, + subsets = { + { + addresses = { { ip = "10.0.0.2" } }, + ports = { { name = "http", port = 80 } }, + }, + }, + } + cbs.on_endpoint_modified(handle, new) + local c4 = handle.endpoint_dict:get("default/svc1") + assert(c4 and c4:find("10.0.0.2"), "content should be updated to 10.0.0.2") + ngx.say("after recover: content_has_new_ip=", tostring(c4:find("10.0.0.2") ~= nil)) + } + } +--- response_body +seeded version=true content_has_ip=true +after empty: version_unchanged=true content_unchanged=true +after subsets=[]: version_unchanged=true +after recover: content_has_new_ip=true +--- error_log +kubernetes discovery: endpoint has no ready addresses +kubernetes discovery: skip empty endpoint update for default/svc1 + + + +=== TEST 2: on_endpoint_slices_modified preserves previous endpoints when no slice has ready endpoints +--- config + location /t { + content_by_lua_block { + local k8s_core = require("apisix.discovery.kubernetes.core") + local cbs = k8s_core.create_endpoint_callbacks({}) + + local handle = { + endpoint_dict = ngx.shared["kubernetes"], + default_weight = 50, + namespace_selector = nil, + endpoint_slices_cache = {}, + current_keys_hash = {}, + } + + -- 1) seed with a normal slice + local seed_slice = { + metadata = { + namespace = "default", + name = "svc1-slice-aaa", + labels = { ["kubernetes.io/service-name"] = "svc1" }, + }, + endpoints = { + { + addresses = { "10.0.0.1" }, + conditions = { ready = true }, + }, + }, + ports = { { name = "http", port = 80 } }, + } + cbs.on_endpoint_slices_modified(handle, seed_slice) + local v1 = handle.endpoint_dict:get("default/svc1#version") + local c1 = handle.endpoint_dict:get("default/svc1") + assert(v1, "version should be set") + assert(c1 and c1:find("10.0.0.1"), "content should contain 10.0.0.1") + + -- 2) deliver an update where the same slice now has an unready endpoint + local empty_slice = { + metadata = { + namespace = "default", + name = "svc1-slice-aaa", + labels = { ["kubernetes.io/service-name"] = "svc1" }, + }, + endpoints = { + { + addresses = { "10.0.0.1" }, + conditions = { ready = false }, + }, + }, + ports = { { name = "http", port = 80 } }, + } + cbs.on_endpoint_slices_modified(handle, empty_slice) + local v2 = handle.endpoint_dict:get("default/svc1#version") + local c2 = handle.endpoint_dict:get("default/svc1") + assert(v2 == v1, "version must be unchanged after all-not-ready slice update") + assert(c2 == c1, "content must be unchanged after all-not-ready slice update") + ngx.say("ok preserved=", tostring(v2 == v1 and c2 == c1)) + } + } +--- response_body +ok preserved=true +--- error_log +kubernetes discovery: skip empty endpoint update for default/svc1