diff --git a/docs/config_properties.md b/docs/config_properties.md index 7929487e..3af5b0d6 100644 --- a/docs/config_properties.md +++ b/docs/config_properties.md @@ -11,6 +11,7 @@ | spark.kubernetes.operator.api.secondaryResourceCreateMaxBackoffMillis | Long | 40000 | false | Maximum backoff (in milliseconds) between retries when creating secondary resources for Spark application. | | spark.kubernetes.operator.api.statusPatchMaxAttempts | Long | 3 | false | Maximal number of retry attempts of requests to k8s server for resource status update. This would be performed on top of k8s client spark.kubernetes.operator.retry.maxAttempts to overcome potential conflicting update on the same SparkApplication. This should be positive number. | | spark.kubernetes.operator.dynamicConfig.enabled | Boolean | false | false | When enabled, operator would use config map as source of truth for config property override. The config map need to be created in spark.kubernetes.operator.namespace, and labeled with operator name. | + | spark.kubernetes.operator.dynamicConfig.name | String | spark-kubernetes-operator-dynamic-configuration | false | The name of the config map to read the dynamic namespace configuration. | | spark.kubernetes.operator.dynamicConfig.reconcilerParallelism | Integer | 1 | false | Parallelism for dynamic config reconciler. Unbounded pool would be used if set to non-positive number. | | spark.kubernetes.operator.dynamicConfig.selector | String | app.kubernetes.io/name=spark-kubernetes-operator,app.kubernetes.io/component=operator-dynamic-config-overrides | false | The selector str applied to dynamic config map. | | spark.kubernetes.operator.health.probePort | Integer | 19091 | false | The port used for health/readiness check probe status. | diff --git a/spark-operator/src/main/java/org/apache/spark/k8s/operator/config/SparkOperatorConf.java b/spark-operator/src/main/java/org/apache/spark/k8s/operator/config/SparkOperatorConf.java index 68bf5e39..13b51c38 100644 --- a/spark-operator/src/main/java/org/apache/spark/k8s/operator/config/SparkOperatorConf.java +++ b/spark-operator/src/main/java/org/apache/spark/k8s/operator/config/SparkOperatorConf.java @@ -261,6 +261,17 @@ public final class SparkOperatorConf { .defaultValue(Utils.labelsAsStr(Utils.defaultOperatorConfigLabels())) .build(); + /** The name of the dynamic namespace list config map. */ + public static final ConfigOption DYNAMIC_CONFIG_MAP_NAME = + ConfigOption.builder() + .key("spark.kubernetes.operator.dynamicConfig.name") + .enableDynamicOverride(false) + .description("The name of the config map to read " + + "the dynamic namespace configuration.") + .typeParameterClass(String.class) + .defaultValue("spark-kubernetes-operator-dynamic-configuration") + .build(); + /** * Parallelism for dynamic config reconciler. Unbounded pool would be used if set to non-positive * number. diff --git a/spark-operator/src/main/java/org/apache/spark/k8s/operator/config/SparkOperatorConfigMapReconciler.java b/spark-operator/src/main/java/org/apache/spark/k8s/operator/config/SparkOperatorConfigMapReconciler.java index 184129d9..75dbe48f 100644 --- a/spark-operator/src/main/java/org/apache/spark/k8s/operator/config/SparkOperatorConfigMapReconciler.java +++ b/spark-operator/src/main/java/org/apache/spark/k8s/operator/config/SparkOperatorConfigMapReconciler.java @@ -19,6 +19,8 @@ package org.apache.spark.k8s.operator.config; +import static org.apache.spark.k8s.operator.config.SparkOperatorConf.DYNAMIC_CONFIG_MAP_NAME; + import java.util.Set; import java.util.function.Function; @@ -69,6 +71,13 @@ public ErrorStatusUpdateControl updateErrorStatus( @Override public UpdateControl reconcile(ConfigMap resource, Context context) throws Exception { + + if (!resource.getMetadata().getName().equals(DYNAMIC_CONFIG_MAP_NAME.getValue())){ + log.warn("Unexpected ConfigMap for dynamic config change: {}", + resource.getMetadata().getName()); + return UpdateControl.noUpdate(); + } + SparkOperatorConfManager.INSTANCE.refresh(resource.getData()); namespaceUpdater.apply(watchedNamespacesGetter.apply(null)); return UpdateControl.noUpdate(); diff --git a/spark-operator/src/test/java/org/apache/spark/k8s/operator/config/SparkOperatorConfigMapReconcilerTest.java b/spark-operator/src/test/java/org/apache/spark/k8s/operator/config/SparkOperatorConfigMapReconcilerTest.java index d8f81e79..21e3ce2b 100644 --- a/spark-operator/src/test/java/org/apache/spark/k8s/operator/config/SparkOperatorConfigMapReconcilerTest.java +++ b/spark-operator/src/test/java/org/apache/spark/k8s/operator/config/SparkOperatorConfigMapReconcilerTest.java @@ -19,12 +19,22 @@ package org.apache.spark.k8s.operator.config; +import static org.apache.spark.k8s.operator.config.SparkOperatorConf.DYNAMIC_CONFIG_MAP_NAME; import static org.apache.spark.k8s.operator.config.SparkOperatorConf.RECONCILER_INTERVAL_SECONDS; import static org.assertj.core.api.Assertions.assertThat; import static org.awaitility.Awaitility.await; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; import java.util.Map; +import java.util.Set; import java.util.function.Function; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; @@ -33,6 +43,8 @@ import io.fabric8.kubernetes.api.model.ObjectMetaBuilder; import io.fabric8.kubernetes.client.KubernetesClient; import io.javaoperatorsdk.operator.Operator; +import io.javaoperatorsdk.operator.api.reconciler.Context; +import io.javaoperatorsdk.operator.api.reconciler.UpdateControl; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -54,7 +66,8 @@ class SparkOperatorConfigMapReconcilerTest { void startController() { var reconciler = new SparkOperatorConfigMapReconciler(mock(Function.class), mock(Function.class)); - operator = new Operator(o -> o.withKubernetesClient(client)); + operator = new Operator(o -> o.withKubernetesClient(client) + .withCloseClientOnStop(false)); operator.register(reconciler); operator.start(); } @@ -62,6 +75,7 @@ void startController() { @AfterEach void stopController() { operator.stop(); + SparkOperatorConfManager.INSTANCE.refresh(Map.of()); } @Test @@ -75,12 +89,101 @@ void sanityTest() { }); } + @Test + @SuppressWarnings("unchecked") + void reconcileSkipsConfigMapWithNonMatchingName() throws Exception { + Function, Boolean> namespaceUpdater = mock(Function.class); + Function> watchedNamespacesGetter = mock(Function.class); + SparkOperatorConfigMapReconciler reconciler = + new SparkOperatorConfigMapReconciler(namespaceUpdater, watchedNamespacesGetter); + + ConfigMap rogueCm = configMap("rogue-config-map", Map.of("rogue.key", "rogue-value")); + + UpdateControl result = reconciler.reconcile(rogueCm, mock(Context.class)); + + assertTrue(result.isNoUpdate()); + verify(namespaceUpdater, never()).apply(any()); + verify(watchedNamespacesGetter, never()).apply(any()); + // The rogue config map's data must not have leaked into the override store. + assertNull(SparkOperatorConfManager.INSTANCE.configOverrides.getProperty("rogue.key")); + } + + @Test + @SuppressWarnings("unchecked") + void reconcileRefreshesConfigForMatchingName() throws Exception { + Function, Boolean> namespaceUpdater = mock(Function.class); + Function> watchedNamespacesGetter = mock(Function.class); + Set watched = Set.of("ns1"); + when(watchedNamespacesGetter.apply(null)).thenReturn(watched); + + SparkOperatorConfigMapReconciler reconciler = + new SparkOperatorConfigMapReconciler(namespaceUpdater, watchedNamespacesGetter); + + ConfigMap goodCm = + configMap( + DYNAMIC_CONFIG_MAP_NAME.getValue(), + Map.of(RECONCILER_INTERVAL_SECONDS.getKey(), TARGET_RECONCILER_INTERVAL.toString())); + + UpdateControl result = reconciler.reconcile(goodCm, mock(Context.class)); + + assertTrue(result.isNoUpdate()); + verify(watchedNamespacesGetter, times(1)).apply(null); + verify(namespaceUpdater, times(1)).apply(watched); + assertEquals(TARGET_RECONCILER_INTERVAL, RECONCILER_INTERVAL_SECONDS.getValue()); + } + + @Test + @SuppressWarnings("unchecked") + void reconcileHonorsCustomConfiguredConfigMapName() throws Exception { + String customName = "custom-dynamic-config"; + // DYNAMIC_CONFIG_MAP_NAME has enableDynamicOverride=false, so it reads from initialConfig + // which is populated only at singleton construction time. Inject the override directly so + // we can exercise the reconciler with a non-default configured name. + Object prevValue = + SparkOperatorConfManager.INSTANCE.initialConfig.put( + DYNAMIC_CONFIG_MAP_NAME.getKey(), customName); + try { + assertEquals(customName, DYNAMIC_CONFIG_MAP_NAME.getValue()); + + Function, Boolean> namespaceUpdater = mock(Function.class); + Function> watchedNamespacesGetter = mock(Function.class); + when(watchedNamespacesGetter.apply(null)).thenReturn(Set.of()); + + SparkOperatorConfigMapReconciler reconciler = + new SparkOperatorConfigMapReconciler(namespaceUpdater, watchedNamespacesGetter); + + // Default-named ConfigMap must be ignored when a custom name is configured. + ConfigMap defaultNameCm = + configMap("spark-kubernetes-operator-dynamic-configuration", + Map.of("rogue.key", "rogue-value")); + reconciler.reconcile(defaultNameCm, mock(Context.class)); + verify(namespaceUpdater, never()).apply(any()); + + // Custom-named ConfigMap must be processed. + ConfigMap customCm = configMap(customName, Map.of()); + reconciler.reconcile(customCm, mock(Context.class)); + verify(namespaceUpdater, times(1)).apply(any()); + } finally { + if (prevValue != null) { + SparkOperatorConfManager.INSTANCE.initialConfig.put( + DYNAMIC_CONFIG_MAP_NAME.getKey(), prevValue); + } else { + SparkOperatorConfManager.INSTANCE.initialConfig.remove(DYNAMIC_CONFIG_MAP_NAME.getKey()); + } + } + } + ConfigMap testConfigMap() { + return configMap( + DYNAMIC_CONFIG_MAP_NAME.getValue(), + Map.of(RECONCILER_INTERVAL_SECONDS.getKey(), TARGET_RECONCILER_INTERVAL.toString())); + } + + private static ConfigMap configMap(String name, Map data) { ConfigMap configMap = new ConfigMap(); configMap.setMetadata( - new ObjectMetaBuilder().withName("spark-conf").withNamespace("default").build()); - configMap.setData( - Map.of(RECONCILER_INTERVAL_SECONDS.getKey(), TARGET_RECONCILER_INTERVAL.toString())); + new ObjectMetaBuilder().withName(name).withNamespace("default").build()); + configMap.setData(data); return configMap; } }