diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkKubernetesDiagnosticsSetter.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkKubernetesDiagnosticsSetter.scala index 24f42919ce05a..824e949e28876 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkKubernetesDiagnosticsSetter.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkKubernetesDiagnosticsSetter.scala @@ -16,8 +16,9 @@ */ package org.apache.spark.deploy.k8s -import io.fabric8.kubernetes.api.model.{Pod, PodBuilder} +import io.fabric8.kubernetes.api.model.PodBuilder import io.fabric8.kubernetes.client.KubernetesClient +import io.fabric8.kubernetes.client.dsl.base.{PatchContext, PatchType} import org.apache.hadoop.util.StringUtils import org.apache.spark.{SparkConf, SparkMasterRegex} @@ -52,6 +53,7 @@ private[spark] class SparkKubernetesDiagnosticsSetter(clientProvider: Kubernetes extends SparkDiagnosticsSetter with Logging { private val KUBERNETES_EXIT_EXCEPTION_MESSAGE_LIMIT_BYTES = 64 * 1024 // 64 KiB + private val PATCH_CONTEXT = PatchContext.of(PatchType.STRATEGIC_MERGE) def this() = { this(new DefaultKubernetesClientProvider) @@ -65,8 +67,8 @@ private[spark] class SparkKubernetesDiagnosticsSetter(clientProvider: Kubernetes client.pods() .inNamespace(conf.get(KUBERNETES_NAMESPACE)) .withName(podName) - .edit((p: Pod) => new PodBuilder(p) - .editOrNewMetadata() + .patch(PATCH_CONTEXT, new PodBuilder() + .withNewMetadata() .addToAnnotations(EXIT_EXCEPTION_ANNOTATION, diagnostics) .endMetadata() .build()); diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/SparkKubernetesDiagnosticsSetterSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/SparkKubernetesDiagnosticsSetterSuite.scala index 65c150b2a035b..aa44eb4d9ea89 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/SparkKubernetesDiagnosticsSetterSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/SparkKubernetesDiagnosticsSetterSuite.scala @@ -16,11 +16,10 @@ */ package org.apache.spark.deploy.k8s -import java.util.function.UnaryOperator - import io.fabric8.kubernetes.api.model.Pod import io.fabric8.kubernetes.client.KubernetesClient import io.fabric8.kubernetes.client.dsl.PodResource +import io.fabric8.kubernetes.client.dsl.base.PatchContext import org.apache.hadoop.util.StringUtils import org.mockito.{ArgumentCaptor, Mock, MockitoAnnotations} import org.mockito.ArgumentMatchers.any @@ -75,15 +74,10 @@ class SparkKubernetesDiagnosticsSetterSuite extends SparkFunSuite setter.setDiagnostics(diagnostics, conf) - val captor: ArgumentCaptor[UnaryOperator[Pod]] = - ArgumentCaptor.forClass(classOf[UnaryOperator[Pod]]) - verify(driverPodOperations).edit(captor.capture()) - - val fn = captor.getValue - val initialPod = SparkPod.initialPod().pod - val editedPod = fn.apply(initialPod) + val podCaptor: ArgumentCaptor[Pod] = ArgumentCaptor.forClass(classOf[Pod]) + verify(driverPodOperations).patch(any(classOf[PatchContext]), podCaptor.capture()) - assert(editedPod.getMetadata.getAnnotations.get(EXIT_EXCEPTION_ANNOTATION) + assert(podCaptor.getValue.getMetadata.getAnnotations.get(EXIT_EXCEPTION_ANNOTATION) == StringUtils.stringifyException(diagnostics)) } }