Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/config_properties.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
| spark.kubernetes.operator.api.secondaryResourceCreateMaxBackoffMillis | Long | 40000 | false | Maximum backoff (in milliseconds) between retries when creating secondary resources for Spark application. |
| spark.kubernetes.operator.api.statusPatchMaxAttempts | Long | 3 | false | Maximal number of retry attempts of requests to k8s server for resource status update. This would be performed on top of k8s client spark.kubernetes.operator.retry.maxAttempts to overcome potential conflicting update on the same SparkApplication. This should be positive number. |
| spark.kubernetes.operator.dynamicConfig.enabled | Boolean | false | false | When enabled, operator would use config map as source of truth for config property override. The config map need to be created in spark.kubernetes.operator.namespace, and labeled with operator name. |
| spark.kubernetes.operator.dynamicConfig.name | String | spark-kubernetes-operator-dynamic-configuration | false | The name of the config map to read the dynamic namespace configuration. |
| spark.kubernetes.operator.dynamicConfig.reconcilerParallelism | Integer | 1 | false | Parallelism for dynamic config reconciler. Unbounded pool would be used if set to non-positive number. |
| spark.kubernetes.operator.dynamicConfig.selector | String | app.kubernetes.io/name=spark-kubernetes-operator,app.kubernetes.io/component=operator-dynamic-config-overrides | false | The selector str applied to dynamic config map. |
| spark.kubernetes.operator.health.probePort | Integer | 19091 | false | The port used for health/readiness check probe status. |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,17 @@ public final class SparkOperatorConf {
.defaultValue(Utils.labelsAsStr(Utils.defaultOperatorConfigLabels()))
.build();

/** The name of the dynamic namespace list config map. */
public static final ConfigOption<String> DYNAMIC_CONFIG_MAP_NAME =
ConfigOption.<String>builder()
.key("spark.kubernetes.operator.dynamicConfig.name")
.enableDynamicOverride(false)
.description("The name of the config map to read " +
"the dynamic namespace configuration.")
.typeParameterClass(String.class)
.defaultValue("spark-kubernetes-operator-dynamic-configuration")
.build();

/**
* Parallelism for dynamic config reconciler. Unbounded pool would be used if set to non-positive
* number.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

package org.apache.spark.k8s.operator.config;

import static org.apache.spark.k8s.operator.config.SparkOperatorConf.DYNAMIC_CONFIG_MAP_NAME;

import java.util.Set;
import java.util.function.Function;

Expand Down Expand Up @@ -69,6 +71,13 @@ public ErrorStatusUpdateControl<ConfigMap> updateErrorStatus(
@Override
public UpdateControl<ConfigMap> reconcile(ConfigMap resource, Context<ConfigMap> context)
throws Exception {

if (!resource.getMetadata().getName().equals(DYNAMIC_CONFIG_MAP_NAME.getValue())){
log.warn("Unexpected ConfigMap for dynamic config change: {}",
resource.getMetadata().getName());
return UpdateControl.noUpdate();
}

SparkOperatorConfManager.INSTANCE.refresh(resource.getData());
namespaceUpdater.apply(watchedNamespacesGetter.apply(null));
return UpdateControl.noUpdate();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,22 @@

package org.apache.spark.k8s.operator.config;

import static org.apache.spark.k8s.operator.config.SparkOperatorConf.DYNAMIC_CONFIG_MAP_NAME;
import static org.apache.spark.k8s.operator.config.SparkOperatorConf.RECONCILER_INTERVAL_SECONDS;
import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import java.util.Map;
import java.util.Set;
import java.util.function.Function;

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
Expand All @@ -33,6 +43,8 @@
import io.fabric8.kubernetes.api.model.ObjectMetaBuilder;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.javaoperatorsdk.operator.Operator;
import io.javaoperatorsdk.operator.api.reconciler.Context;
import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
Expand All @@ -54,14 +66,16 @@ class SparkOperatorConfigMapReconcilerTest {
void startController() {
var reconciler =
new SparkOperatorConfigMapReconciler(mock(Function.class), mock(Function.class));
operator = new Operator(o -> o.withKubernetesClient(client));
operator = new Operator(o -> o.withKubernetesClient(client)
.withCloseClientOnStop(false));
operator.register(reconciler);
operator.start();
}

@AfterEach
void stopController() {
operator.stop();
SparkOperatorConfManager.INSTANCE.refresh(Map.of());
}

@Test
Expand All @@ -75,12 +89,101 @@ void sanityTest() {
});
}

@Test
@SuppressWarnings("unchecked")
void reconcileSkipsConfigMapWithNonMatchingName() throws Exception {
Function<Set<String>, Boolean> namespaceUpdater = mock(Function.class);
Function<Void, Set<String>> watchedNamespacesGetter = mock(Function.class);
SparkOperatorConfigMapReconciler reconciler =
new SparkOperatorConfigMapReconciler(namespaceUpdater, watchedNamespacesGetter);

ConfigMap rogueCm = configMap("rogue-config-map", Map.of("rogue.key", "rogue-value"));

UpdateControl<ConfigMap> result = reconciler.reconcile(rogueCm, mock(Context.class));

assertTrue(result.isNoUpdate());
verify(namespaceUpdater, never()).apply(any());
verify(watchedNamespacesGetter, never()).apply(any());
// The rogue config map's data must not have leaked into the override store.
assertNull(SparkOperatorConfManager.INSTANCE.configOverrides.getProperty("rogue.key"));
}

@Test
@SuppressWarnings("unchecked")
void reconcileRefreshesConfigForMatchingName() throws Exception {
Function<Set<String>, Boolean> namespaceUpdater = mock(Function.class);
Function<Void, Set<String>> watchedNamespacesGetter = mock(Function.class);
Set<String> watched = Set.of("ns1");
when(watchedNamespacesGetter.apply(null)).thenReturn(watched);

SparkOperatorConfigMapReconciler reconciler =
new SparkOperatorConfigMapReconciler(namespaceUpdater, watchedNamespacesGetter);

ConfigMap goodCm =
configMap(
DYNAMIC_CONFIG_MAP_NAME.getValue(),
Map.of(RECONCILER_INTERVAL_SECONDS.getKey(), TARGET_RECONCILER_INTERVAL.toString()));

UpdateControl<ConfigMap> result = reconciler.reconcile(goodCm, mock(Context.class));

assertTrue(result.isNoUpdate());
verify(watchedNamespacesGetter, times(1)).apply(null);
verify(namespaceUpdater, times(1)).apply(watched);
assertEquals(TARGET_RECONCILER_INTERVAL, RECONCILER_INTERVAL_SECONDS.getValue());
}

@Test
@SuppressWarnings("unchecked")
void reconcileHonorsCustomConfiguredConfigMapName() throws Exception {
String customName = "custom-dynamic-config";
// DYNAMIC_CONFIG_MAP_NAME has enableDynamicOverride=false, so it reads from initialConfig
// which is populated only at singleton construction time. Inject the override directly so
// we can exercise the reconciler with a non-default configured name.
Object prevValue =
SparkOperatorConfManager.INSTANCE.initialConfig.put(
DYNAMIC_CONFIG_MAP_NAME.getKey(), customName);
try {
assertEquals(customName, DYNAMIC_CONFIG_MAP_NAME.getValue());

Function<Set<String>, Boolean> namespaceUpdater = mock(Function.class);
Function<Void, Set<String>> watchedNamespacesGetter = mock(Function.class);
when(watchedNamespacesGetter.apply(null)).thenReturn(Set.of());

SparkOperatorConfigMapReconciler reconciler =
new SparkOperatorConfigMapReconciler(namespaceUpdater, watchedNamespacesGetter);

// Default-named ConfigMap must be ignored when a custom name is configured.
ConfigMap defaultNameCm =
configMap("spark-kubernetes-operator-dynamic-configuration",
Map.of("rogue.key", "rogue-value"));
reconciler.reconcile(defaultNameCm, mock(Context.class));
verify(namespaceUpdater, never()).apply(any());

// Custom-named ConfigMap must be processed.
ConfigMap customCm = configMap(customName, Map.of());
reconciler.reconcile(customCm, mock(Context.class));
verify(namespaceUpdater, times(1)).apply(any());
} finally {
if (prevValue != null) {
SparkOperatorConfManager.INSTANCE.initialConfig.put(
DYNAMIC_CONFIG_MAP_NAME.getKey(), prevValue);
} else {
SparkOperatorConfManager.INSTANCE.initialConfig.remove(DYNAMIC_CONFIG_MAP_NAME.getKey());
}
}
}

ConfigMap testConfigMap() {
return configMap(
DYNAMIC_CONFIG_MAP_NAME.getValue(),
Map.of(RECONCILER_INTERVAL_SECONDS.getKey(), TARGET_RECONCILER_INTERVAL.toString()));
}

private static ConfigMap configMap(String name, Map<String, String> data) {
ConfigMap configMap = new ConfigMap();
configMap.setMetadata(
new ObjectMetaBuilder().withName("spark-conf").withNamespace("default").build());
configMap.setData(
Map.of(RECONCILER_INTERVAL_SECONDS.getKey(), TARGET_RECONCILER_INTERVAL.toString()));
new ObjectMetaBuilder().withName(name).withNamespace("default").build());
configMap.setData(data);
return configMap;
}
}
Loading