From dbdc3f0c81cd66a27a6b137f5251fc84525daa8f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Attila=20M=C3=A9sz=C3=A1ros?= Date: Thu, 28 May 2026 10:44:20 +0200 Subject: [PATCH 1/3] Check ConfigMap name for dynamic namespace loading MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Attila Mészáros --- docs/config_properties.md | 1 + .../spark/k8s/operator/config/SparkOperatorConf.java | 10 ++++++++++ .../config/SparkOperatorConfigMapReconciler.java | 7 +++++++ 3 files changed, 18 insertions(+) diff --git a/docs/config_properties.md b/docs/config_properties.md index 7929487e..3af5b0d6 100644 --- a/docs/config_properties.md +++ b/docs/config_properties.md @@ -11,6 +11,7 @@ | spark.kubernetes.operator.api.secondaryResourceCreateMaxBackoffMillis | Long | 40000 | false | Maximum backoff (in milliseconds) between retries when creating secondary resources for Spark application. | | spark.kubernetes.operator.api.statusPatchMaxAttempts | Long | 3 | false | Maximal number of retry attempts of requests to k8s server for resource status update. This would be performed on top of k8s client spark.kubernetes.operator.retry.maxAttempts to overcome potential conflicting update on the same SparkApplication. This should be positive number. | | spark.kubernetes.operator.dynamicConfig.enabled | Boolean | false | false | When enabled, operator would use config map as source of truth for config property override. The config map need to be created in spark.kubernetes.operator.namespace, and labeled with operator name. | + | spark.kubernetes.operator.dynamicConfig.name | String | spark-kubernetes-operator-dynamic-configuration | false | The name of the config map to read the dynamic namespace configuration. | | spark.kubernetes.operator.dynamicConfig.reconcilerParallelism | Integer | 1 | false | Parallelism for dynamic config reconciler. Unbounded pool would be used if set to non-positive number. | | spark.kubernetes.operator.dynamicConfig.selector | String | app.kubernetes.io/name=spark-kubernetes-operator,app.kubernetes.io/component=operator-dynamic-config-overrides | false | The selector str applied to dynamic config map. | | spark.kubernetes.operator.health.probePort | Integer | 19091 | false | The port used for health/readiness check probe status. | diff --git a/spark-operator/src/main/java/org/apache/spark/k8s/operator/config/SparkOperatorConf.java b/spark-operator/src/main/java/org/apache/spark/k8s/operator/config/SparkOperatorConf.java index 68bf5e39..32f23713 100644 --- a/spark-operator/src/main/java/org/apache/spark/k8s/operator/config/SparkOperatorConf.java +++ b/spark-operator/src/main/java/org/apache/spark/k8s/operator/config/SparkOperatorConf.java @@ -261,6 +261,16 @@ public final class SparkOperatorConf { .defaultValue(Utils.labelsAsStr(Utils.defaultOperatorConfigLabels())) .build(); + /** The name of the dynamic config map. */ + public static final ConfigOption DYNAMIC_CONFIG_MAP_NAME = + ConfigOption.builder() + .key("spark.kubernetes.operator.dynamicConfig.name") + .enableDynamicOverride(false) + .description("The name of the config map to read the dynamic namespace configuration.") + .typeParameterClass(String.class) + .defaultValue("spark-kubernetes-operator-dynamic-configuration") + .build(); + /** * Parallelism for dynamic config reconciler. Unbounded pool would be used if set to non-positive * number. diff --git a/spark-operator/src/main/java/org/apache/spark/k8s/operator/config/SparkOperatorConfigMapReconciler.java b/spark-operator/src/main/java/org/apache/spark/k8s/operator/config/SparkOperatorConfigMapReconciler.java index 184129d9..83be8602 100644 --- a/spark-operator/src/main/java/org/apache/spark/k8s/operator/config/SparkOperatorConfigMapReconciler.java +++ b/spark-operator/src/main/java/org/apache/spark/k8s/operator/config/SparkOperatorConfigMapReconciler.java @@ -31,6 +31,8 @@ import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; +import static org.apache.spark.k8s.operator.config.SparkOperatorConf.DYNAMIC_CONFIG_MAP_NAME; + /** * This serves dynamic configuration for Spark Operator. When enabled, Operator assumes config file * is located in given config map. It would keep watch the config map and apply changes when update @@ -69,6 +71,11 @@ public ErrorStatusUpdateControl updateErrorStatus( @Override public UpdateControl reconcile(ConfigMap resource, Context context) throws Exception { + + if (!resource.getMetadata().getName().equals(DYNAMIC_CONFIG_MAP_NAME.getValue())){ + return UpdateControl.noUpdate(); + } + SparkOperatorConfManager.INSTANCE.refresh(resource.getData()); namespaceUpdater.apply(watchedNamespacesGetter.apply(null)); return UpdateControl.noUpdate(); From 573b67f168e09d51b19e76c656e4588e1a834faa Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Attila=20M=C3=A9sz=C3=A1ros?= Date: Thu, 28 May 2026 10:47:54 +0200 Subject: [PATCH 2/3] wip MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Attila Mészáros --- .../org/apache/spark/k8s/operator/config/SparkOperatorConf.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spark-operator/src/main/java/org/apache/spark/k8s/operator/config/SparkOperatorConf.java b/spark-operator/src/main/java/org/apache/spark/k8s/operator/config/SparkOperatorConf.java index 32f23713..e431f8bf 100644 --- a/spark-operator/src/main/java/org/apache/spark/k8s/operator/config/SparkOperatorConf.java +++ b/spark-operator/src/main/java/org/apache/spark/k8s/operator/config/SparkOperatorConf.java @@ -261,7 +261,7 @@ public final class SparkOperatorConf { .defaultValue(Utils.labelsAsStr(Utils.defaultOperatorConfigLabels())) .build(); - /** The name of the dynamic config map. */ + /** The name of the dynamic namespace list config map. */ public static final ConfigOption DYNAMIC_CONFIG_MAP_NAME = ConfigOption.builder() .key("spark.kubernetes.operator.dynamicConfig.name") From 4e737e2ddd498aa9570d4ce7c5e304340da88b44 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Attila=20M=C3=A9sz=C3=A1ros?= Date: Thu, 28 May 2026 15:07:25 +0200 Subject: [PATCH 3/3] unit tests and logging MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Attila Mészáros --- .../operator/config/SparkOperatorConf.java | 3 +- .../SparkOperatorConfigMapReconciler.java | 6 +- .../SparkOperatorConfigMapReconcilerTest.java | 111 +++++++++++++++++- 3 files changed, 113 insertions(+), 7 deletions(-) diff --git a/spark-operator/src/main/java/org/apache/spark/k8s/operator/config/SparkOperatorConf.java b/spark-operator/src/main/java/org/apache/spark/k8s/operator/config/SparkOperatorConf.java index e431f8bf..13b51c38 100644 --- a/spark-operator/src/main/java/org/apache/spark/k8s/operator/config/SparkOperatorConf.java +++ b/spark-operator/src/main/java/org/apache/spark/k8s/operator/config/SparkOperatorConf.java @@ -266,7 +266,8 @@ public final class SparkOperatorConf { ConfigOption.builder() .key("spark.kubernetes.operator.dynamicConfig.name") .enableDynamicOverride(false) - .description("The name of the config map to read the dynamic namespace configuration.") + .description("The name of the config map to read " + + "the dynamic namespace configuration.") .typeParameterClass(String.class) .defaultValue("spark-kubernetes-operator-dynamic-configuration") .build(); diff --git a/spark-operator/src/main/java/org/apache/spark/k8s/operator/config/SparkOperatorConfigMapReconciler.java b/spark-operator/src/main/java/org/apache/spark/k8s/operator/config/SparkOperatorConfigMapReconciler.java index 83be8602..75dbe48f 100644 --- a/spark-operator/src/main/java/org/apache/spark/k8s/operator/config/SparkOperatorConfigMapReconciler.java +++ b/spark-operator/src/main/java/org/apache/spark/k8s/operator/config/SparkOperatorConfigMapReconciler.java @@ -19,6 +19,8 @@ package org.apache.spark.k8s.operator.config; +import static org.apache.spark.k8s.operator.config.SparkOperatorConf.DYNAMIC_CONFIG_MAP_NAME; + import java.util.Set; import java.util.function.Function; @@ -31,8 +33,6 @@ import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; -import static org.apache.spark.k8s.operator.config.SparkOperatorConf.DYNAMIC_CONFIG_MAP_NAME; - /** * This serves dynamic configuration for Spark Operator. When enabled, Operator assumes config file * is located in given config map. It would keep watch the config map and apply changes when update @@ -73,6 +73,8 @@ public UpdateControl reconcile(ConfigMap resource, Context throws Exception { if (!resource.getMetadata().getName().equals(DYNAMIC_CONFIG_MAP_NAME.getValue())){ + log.warn("Unexpected ConfigMap for dynamic config change: {}", + resource.getMetadata().getName()); return UpdateControl.noUpdate(); } diff --git a/spark-operator/src/test/java/org/apache/spark/k8s/operator/config/SparkOperatorConfigMapReconcilerTest.java b/spark-operator/src/test/java/org/apache/spark/k8s/operator/config/SparkOperatorConfigMapReconcilerTest.java index d8f81e79..21e3ce2b 100644 --- a/spark-operator/src/test/java/org/apache/spark/k8s/operator/config/SparkOperatorConfigMapReconcilerTest.java +++ b/spark-operator/src/test/java/org/apache/spark/k8s/operator/config/SparkOperatorConfigMapReconcilerTest.java @@ -19,12 +19,22 @@ package org.apache.spark.k8s.operator.config; +import static org.apache.spark.k8s.operator.config.SparkOperatorConf.DYNAMIC_CONFIG_MAP_NAME; import static org.apache.spark.k8s.operator.config.SparkOperatorConf.RECONCILER_INTERVAL_SECONDS; import static org.assertj.core.api.Assertions.assertThat; import static org.awaitility.Awaitility.await; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; import java.util.Map; +import java.util.Set; import java.util.function.Function; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; @@ -33,6 +43,8 @@ import io.fabric8.kubernetes.api.model.ObjectMetaBuilder; import io.fabric8.kubernetes.client.KubernetesClient; import io.javaoperatorsdk.operator.Operator; +import io.javaoperatorsdk.operator.api.reconciler.Context; +import io.javaoperatorsdk.operator.api.reconciler.UpdateControl; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -54,7 +66,8 @@ class SparkOperatorConfigMapReconcilerTest { void startController() { var reconciler = new SparkOperatorConfigMapReconciler(mock(Function.class), mock(Function.class)); - operator = new Operator(o -> o.withKubernetesClient(client)); + operator = new Operator(o -> o.withKubernetesClient(client) + .withCloseClientOnStop(false)); operator.register(reconciler); operator.start(); } @@ -62,6 +75,7 @@ void startController() { @AfterEach void stopController() { operator.stop(); + SparkOperatorConfManager.INSTANCE.refresh(Map.of()); } @Test @@ -75,12 +89,101 @@ void sanityTest() { }); } + @Test + @SuppressWarnings("unchecked") + void reconcileSkipsConfigMapWithNonMatchingName() throws Exception { + Function, Boolean> namespaceUpdater = mock(Function.class); + Function> watchedNamespacesGetter = mock(Function.class); + SparkOperatorConfigMapReconciler reconciler = + new SparkOperatorConfigMapReconciler(namespaceUpdater, watchedNamespacesGetter); + + ConfigMap rogueCm = configMap("rogue-config-map", Map.of("rogue.key", "rogue-value")); + + UpdateControl result = reconciler.reconcile(rogueCm, mock(Context.class)); + + assertTrue(result.isNoUpdate()); + verify(namespaceUpdater, never()).apply(any()); + verify(watchedNamespacesGetter, never()).apply(any()); + // The rogue config map's data must not have leaked into the override store. + assertNull(SparkOperatorConfManager.INSTANCE.configOverrides.getProperty("rogue.key")); + } + + @Test + @SuppressWarnings("unchecked") + void reconcileRefreshesConfigForMatchingName() throws Exception { + Function, Boolean> namespaceUpdater = mock(Function.class); + Function> watchedNamespacesGetter = mock(Function.class); + Set watched = Set.of("ns1"); + when(watchedNamespacesGetter.apply(null)).thenReturn(watched); + + SparkOperatorConfigMapReconciler reconciler = + new SparkOperatorConfigMapReconciler(namespaceUpdater, watchedNamespacesGetter); + + ConfigMap goodCm = + configMap( + DYNAMIC_CONFIG_MAP_NAME.getValue(), + Map.of(RECONCILER_INTERVAL_SECONDS.getKey(), TARGET_RECONCILER_INTERVAL.toString())); + + UpdateControl result = reconciler.reconcile(goodCm, mock(Context.class)); + + assertTrue(result.isNoUpdate()); + verify(watchedNamespacesGetter, times(1)).apply(null); + verify(namespaceUpdater, times(1)).apply(watched); + assertEquals(TARGET_RECONCILER_INTERVAL, RECONCILER_INTERVAL_SECONDS.getValue()); + } + + @Test + @SuppressWarnings("unchecked") + void reconcileHonorsCustomConfiguredConfigMapName() throws Exception { + String customName = "custom-dynamic-config"; + // DYNAMIC_CONFIG_MAP_NAME has enableDynamicOverride=false, so it reads from initialConfig + // which is populated only at singleton construction time. Inject the override directly so + // we can exercise the reconciler with a non-default configured name. + Object prevValue = + SparkOperatorConfManager.INSTANCE.initialConfig.put( + DYNAMIC_CONFIG_MAP_NAME.getKey(), customName); + try { + assertEquals(customName, DYNAMIC_CONFIG_MAP_NAME.getValue()); + + Function, Boolean> namespaceUpdater = mock(Function.class); + Function> watchedNamespacesGetter = mock(Function.class); + when(watchedNamespacesGetter.apply(null)).thenReturn(Set.of()); + + SparkOperatorConfigMapReconciler reconciler = + new SparkOperatorConfigMapReconciler(namespaceUpdater, watchedNamespacesGetter); + + // Default-named ConfigMap must be ignored when a custom name is configured. + ConfigMap defaultNameCm = + configMap("spark-kubernetes-operator-dynamic-configuration", + Map.of("rogue.key", "rogue-value")); + reconciler.reconcile(defaultNameCm, mock(Context.class)); + verify(namespaceUpdater, never()).apply(any()); + + // Custom-named ConfigMap must be processed. + ConfigMap customCm = configMap(customName, Map.of()); + reconciler.reconcile(customCm, mock(Context.class)); + verify(namespaceUpdater, times(1)).apply(any()); + } finally { + if (prevValue != null) { + SparkOperatorConfManager.INSTANCE.initialConfig.put( + DYNAMIC_CONFIG_MAP_NAME.getKey(), prevValue); + } else { + SparkOperatorConfManager.INSTANCE.initialConfig.remove(DYNAMIC_CONFIG_MAP_NAME.getKey()); + } + } + } + ConfigMap testConfigMap() { + return configMap( + DYNAMIC_CONFIG_MAP_NAME.getValue(), + Map.of(RECONCILER_INTERVAL_SECONDS.getKey(), TARGET_RECONCILER_INTERVAL.toString())); + } + + private static ConfigMap configMap(String name, Map data) { ConfigMap configMap = new ConfigMap(); configMap.setMetadata( - new ObjectMetaBuilder().withName("spark-conf").withNamespace("default").build()); - configMap.setData( - Map.of(RECONCILER_INTERVAL_SECONDS.getKey(), TARGET_RECONCILER_INTERVAL.toString())); + new ObjectMetaBuilder().withName(name).withNamespace("default").build()); + configMap.setData(data); return configMap; } }