Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 0 additions & 5 deletions .github/workflows/build_and_test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,6 @@ jobs:
- resource-retain-duration
- resource-retain-on-failure
- resource-selector
- watched-namespaces
- driver-start-timeout
exclude:
- mode: dynamic
Expand All @@ -113,8 +112,6 @@ jobs:
test-group: resource-selector
- mode: dynamic
test-group: driver-start-timeout
- mode: static
test-group: watched-namespaces
- mode: static
test-group: resource-selector
- mode: selector
Expand All @@ -127,8 +124,6 @@ jobs:
test-group: resource-retain-duration
- mode: selector
test-group: resource-retain-on-failure
- mode: selector
test-group: watched-namespaces
- mode: selector
test-group: driver-start-timeout
include:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ExecutorService;
Expand All @@ -37,7 +36,6 @@
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.http.Interceptor;
import io.javaoperatorsdk.operator.Operator;
import io.javaoperatorsdk.operator.RegisteredController;
import io.javaoperatorsdk.operator.api.config.ConfigurationServiceOverrider;
import io.javaoperatorsdk.operator.api.config.ControllerConfigurationOverrider;
import lombok.extern.slf4j.Slf4j;
Expand Down Expand Up @@ -73,7 +71,6 @@ public class SparkOperator {
private final SparkClusterSubmissionWorker clusterSubmissionWorker;
private final SparkAppStatusRecorder sparkAppStatusRecorder;
private final SparkClusterStatusRecorder sparkClusterStatusRecorder;
protected Set<RegisteredController<?>> registeredSparkControllers;
protected Set<String> watchedNamespaces;
private final MetricsSystem metricsSystem;
private final SentinelManager<SparkApplication> sparkApplicationSentinelManager;
Expand All @@ -95,7 +92,6 @@ public SparkOperator() {
this.sparkAppStatusRecorder =
new SparkAppStatusRecorder(getAppStatusListener(), recorderSource);
this.sparkClusterStatusRecorder = new SparkClusterStatusRecorder(getClusterStatusListener());
this.registeredSparkControllers = new HashSet<>();
this.watchedNamespaces = getWatchedNamespaces();
this.sparkApplicationSentinelManager = new SentinelManager<>();
this.sparkClusterSentinelManager = new SentinelManager<>();
Expand Down Expand Up @@ -161,16 +157,14 @@ static void runSystemGc() {
*/
protected Operator registerSparkOperator() {
Operator op = new Operator(this::overrideOperatorConfigs);
registeredSparkControllers.add(
op.register(
new SparkAppReconciler(
appSubmissionWorker, sparkAppStatusRecorder, sparkApplicationSentinelManager),
this::overrideControllerConfigs));
registeredSparkControllers.add(
op.register(
new SparkClusterReconciler(
clusterSubmissionWorker, sparkClusterStatusRecorder, sparkClusterSentinelManager),
this::overrideControllerConfigs));
op.register(
new SparkAppReconciler(
appSubmissionWorker, sparkAppStatusRecorder, sparkApplicationSentinelManager),
this::overrideControllerConfigs);
op.register(
new SparkClusterReconciler(
clusterSubmissionWorker, sparkClusterStatusRecorder, sparkClusterSentinelManager),
this::overrideControllerConfigs);
return op;
}

Expand All @@ -188,8 +182,7 @@ protected Operator registerSparkOperatorConfMonitor() {
operatorNamespace,
confSelector);
op.register(
new SparkOperatorConfigMapReconciler(
this::updateWatchingNamespaces, unused -> getWatchedNamespaces()),
new SparkOperatorConfigMapReconciler(),
c -> {
c.withRateLimiter(SparkOperatorConf.getOperatorRateLimiter());
c.settingNamespaces(operatorNamespace);
Expand All @@ -198,38 +191,6 @@ protected Operator registerSparkOperatorConfMonitor() {
return op;
}

/**
* Updates the set of namespaces that the operator is watching.
*
* @param namespaces The new set of namespaces to watch.
* @return True if the namespaces were updated, false otherwise.
*/
protected boolean updateWatchingNamespaces(Set<String> namespaces) {
if (watchedNamespaces.equals(namespaces)) {
log.info("No watched namespace change detected");
return false;
}
if (watchedNamespaces.isEmpty()) {
log.info("Cannot update watch namespaces for operator started at cluster level.");
return false;
}
if (namespaces == null || namespaces.isEmpty()) {
log.error("Cannot updating namespaces to empty");
return false;
}
registeredSparkControllers.forEach(
c -> {
if (c.allowsNamespaceChanges()) {
log.info("Updating operator namespaces to {}", namespaces);
c.changeNamespaces(namespaces);
} else {
log.error("Controller does not allow namespace change, skipping namespace change.");
}
});
this.watchedNamespaces = new HashSet<>(namespaces);
return true;
}

/**
* Overrides the default configuration for the operator.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,29 +19,23 @@

package org.apache.spark.k8s.operator.config;

import java.util.Set;
import java.util.function.Function;

import io.fabric8.kubernetes.api.model.ConfigMap;
import io.javaoperatorsdk.operator.api.reconciler.Context;
import io.javaoperatorsdk.operator.api.reconciler.ControllerConfiguration;
import io.javaoperatorsdk.operator.api.reconciler.ErrorStatusUpdateControl;
import io.javaoperatorsdk.operator.api.reconciler.Reconciler;
import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;

/**
* This serves dynamic configuration for Spark Operator. When enabled, Operator assumes config file
* is located in given config map. It would keep watch the config map and apply changes when update
* is detected.
* is detected. The set of watched namespaces is intentionally excluded from dynamic refresh and can
* only be changed by restarting the operator with updated Helm values.
*/
@ControllerConfiguration
@RequiredArgsConstructor
@Slf4j
public class SparkOperatorConfigMapReconciler implements Reconciler<ConfigMap> {
private final Function<Set<String>, Boolean> namespaceUpdater;
private final Function<Void, Set<String>> watchedNamespacesGetter;

/**
* Updates the error status of the ConfigMap reconciliation.
Expand Down Expand Up @@ -70,7 +64,6 @@ public ErrorStatusUpdateControl<ConfigMap> updateErrorStatus(
public UpdateControl<ConfigMap> reconcile(ConfigMap resource, Context<ConfigMap> context)
throws Exception {
SparkOperatorConfManager.INSTANCE.refresh(resource.getData());
namespaceUpdater.apply(watchedNamespacesGetter.apply(null));
return UpdateControl.noUpdate();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,12 @@
import static org.mockito.Mockito.mockConstruction;
import static org.mockito.Mockito.mockStatic;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;

import java.util.Set;
import java.util.function.Consumer;

import io.fabric8.kubernetes.client.KubernetesClient;
import io.javaoperatorsdk.operator.Operator;
import io.javaoperatorsdk.operator.RegisteredController;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.mockito.MockedConstruction;
Expand All @@ -49,7 +46,6 @@
import org.apache.spark.k8s.operator.metrics.source.KubernetesMetricsInterceptor;
import org.apache.spark.k8s.operator.probe.ProbeService;
import org.apache.spark.k8s.operator.reconciler.SparkAppReconciler;
import org.apache.spark.k8s.operator.reconciler.SparkClusterReconciler;
import org.apache.spark.k8s.operator.utils.Utils;

class SparkOperatorTest {
Expand Down Expand Up @@ -85,8 +81,7 @@ void testOperatorConstructionWithDynamicConfigEnabled() {
.thenReturn(mockClient);
mockUtils.when(Utils::getWatchedNamespaces).thenReturn(Set.of("namespace-1"));

SparkOperator sparkOperator = new SparkOperator();
Assertions.assertEquals(1, sparkOperator.registeredSparkControllers.size());
new SparkOperator();
Assertions.assertEquals(2, operatorConstruction.constructed().size());
Assertions.assertEquals(1, sparkAppReconcilerConstruction.constructed().size());
Assertions.assertEquals(1, configReconcilerConstruction.constructed().size());
Expand Down Expand Up @@ -128,8 +123,7 @@ void testOperatorConstructionWithDynamicConfigDisabled() {
mockKubernetesClientFactory
.when(() -> KubernetesClientFactory.buildKubernetesClient(any()))
.thenReturn(mockClient);
SparkOperator sparkOperator = new SparkOperator();
Assertions.assertEquals(1, sparkOperator.registeredSparkControllers.size());
new SparkOperator();
Assertions.assertEquals(1, operatorConstruction.constructed().size());
Assertions.assertEquals(1, sparkAppReconcilerConstruction.constructed().size());
Assertions.assertEquals(1, probeServiceConstruction.constructed().size());
Expand All @@ -140,57 +134,4 @@ void testOperatorConstructionWithDynamicConfigDisabled() {
setConfigKey(SparkOperatorConf.DYNAMIC_CONFIG_ENABLED, dynamicConfigEnabled);
}
}

@SuppressWarnings("PMD.UnusedLocalVariable")
@Test
void testUpdateWatchedNamespacesWithDynamicConfigEnabled() {
MetricsSystem mockMetricsSystem = mock(MetricsSystem.class);
KubernetesClient mockClient = mock(KubernetesClient.class);
var registeredController = mock(RegisteredController.class);
when(registeredController.allowsNamespaceChanges()).thenReturn(true);
boolean dynamicConfigEnabled = SparkOperatorConf.DYNAMIC_CONFIG_ENABLED.getValue();

try (MockedStatic<MetricsSystemFactory> mockMetricsSystemFactory =
mockStatic(MetricsSystemFactory.class);
MockedStatic<KubernetesClientFactory> mockKubernetesClientFactory =
mockStatic(KubernetesClientFactory.class);
MockedStatic<Utils> mockUtils = mockStatic(Utils.class);
MockedConstruction<Operator> operatorConstruction =
mockConstruction(
Operator.class,
(mock, context) -> {
when(mock.register(any(SparkAppReconciler.class), any(Consumer.class)))
.thenReturn(registeredController);
when(mock.register(any(SparkClusterReconciler.class), any(Consumer.class)))
.thenReturn(registeredController);
});
MockedConstruction<SparkAppReconciler> sparkAppReconcilerConstruction =
mockConstruction(SparkAppReconciler.class);
MockedConstruction<SparkOperatorConfigMapReconciler> configReconcilerConstruction =
mockConstruction(SparkOperatorConfigMapReconciler.class);
MockedConstruction<ProbeService> probeServiceConstruction =
mockConstruction(ProbeService.class);
MockedConstruction<MetricsService> metricsServiceConstruction =
mockConstruction(MetricsService.class);
MockedConstruction<KubernetesMetricsInterceptor> interceptorMockedConstruction =
mockConstruction(KubernetesMetricsInterceptor.class)) {
setConfigKey(SparkOperatorConf.DYNAMIC_CONFIG_ENABLED, true);
mockMetricsSystemFactory
.when(MetricsSystemFactory::createMetricsSystem)
.thenReturn(mockMetricsSystem);
mockKubernetesClientFactory
.when(() -> KubernetesClientFactory.buildKubernetesClient(any()))
.thenReturn(mockClient);
mockUtils.when(Utils::getWatchedNamespaces).thenReturn(Set.of("namespace-1"));
SparkOperator sparkOperator = new SparkOperator();
Set<String> updatedNamespaces = Set.of("namespace-1", "namespace-2");
Assertions.assertTrue(sparkOperator.updateWatchingNamespaces(updatedNamespaces));
Assertions.assertEquals(updatedNamespaces, sparkOperator.watchedNamespaces);
verify(registeredController).allowsNamespaceChanges();
verify(registeredController).changeNamespaces(updatedNamespaces);
verifyNoMoreInteractions(registeredController);
} finally {
setConfigKey(SparkOperatorConf.DYNAMIC_CONFIG_ENABLED, dynamicConfigEnabled);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,8 @@
import static org.apache.spark.k8s.operator.config.SparkOperatorConf.RECONCILER_INTERVAL_SECONDS;
import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;
import static org.mockito.Mockito.mock;

import java.util.Map;
import java.util.function.Function;

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.fabric8.kubeapitest.junit.EnableKubeAPIServer;
Expand All @@ -50,10 +48,8 @@ class SparkOperatorConfigMapReconcilerTest {
Operator operator;

@BeforeEach
@SuppressWarnings("unchecked")
void startController() {
var reconciler =
new SparkOperatorConfigMapReconciler(mock(Function.class), mock(Function.class));
var reconciler = new SparkOperatorConfigMapReconciler();
operator = new Operator(o -> o.withKubernetesClient(client));
operator.register(reconciler);
operator.start();
Expand Down
39 changes: 0 additions & 39 deletions tests/e2e/helm/dynamic-config-values-2.yaml

This file was deleted.

Loading
Loading