diff --git a/artemis-lockmanager/artemis-kube-lock-all/pom.xml b/artemis-lockmanager/artemis-kube-lock-all/pom.xml
new file mode 100644
index 00000000000..0f3cbfcc5c0
--- /dev/null
+++ b/artemis-lockmanager/artemis-kube-lock-all/pom.xml
@@ -0,0 +1,209 @@
+
+
+ 4.0.0
+
+
+ org.apache.artemis
+ artemis-lockmanager
+ 2.54.0-SNAPSHOT
+
+
+ artemis-kube-lock-all
+ jar
+ Kube Lock Manager (all dependencies)
+
+
+ ${project.basedir}/../..
+
+
+
+
+ org.apache.artemis
+ artemis-kube-lock
+ ${project.version}
+ true
+
+
+
+
+ org.bouncycastle
+ bcpkix-jdk18on
+ ${bc-java-version}
+ compile
+
+
+ org.bouncycastle
+ bcprov-jdk18on
+ ${bc-java-version}
+ compile
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-shade-plugin
+
+
+ package
+
+ shade
+
+
+ true
+ false
+ false
+
+
+ *:*
+
+
+
+ org.apache.artemis:artemis-lockmanager-api
+
+
+
+
+
+ io.kubernetes
+ artemis.kube.shade.io.kubernetes
+
+
+ io.swagger
+ artemis.kube.shade.io.swagger
+
+
+ io.gapi
+ artemis.kube.shade.io.gapi
+
+
+ io.sundr
+ artemis.kube.shade.io.sundr
+
+
+ io.gsonfire
+ artemis.kube.shade.io.gsonfire
+
+
+ com.google
+ artemis.kube.shade.com.google
+
+
+ com.squareup
+ artemis.kube.shade.com.squareup
+
+
+ com.fasterxml
+ artemis.kube.shade.com.fasterxml
+
+
+ okhttp3
+ artemis.kube.shade.okhttp3
+
+
+ okio
+ artemis.kube.shade.okio
+
+
+ org.yaml
+ artemis.kube.shade.org.yaml
+
+
+ org.joda
+ artemis.kube.shade.org.joda
+
+
+ org.threeten
+ artemis.kube.shade.org.threeten
+
+
+ org.checkerframework
+ artemis.kube.shade.org.checkerframework
+
+
+ javax.annotation
+ artemis.kube.shade.javax.annotation
+
+
+ kotlin
+ artemis.kube.shade.kotlin
+
+
+ org.jetbrains
+ artemis.kube.shade.org.jetbrains
+
+
+ org.intellij
+ artemis.kube.shade.org.intellij
+
+
+ org.apache.commons
+ artemis.kube.shade.org.apache.commons
+
+
+ org.slf4j
+ artemis.kube.shade.org.slf4j
+
+
+ jakarta
+ artemis.kube.shade.jakarta
+
+
+ org.jose4j
+ artemis.kube.shade.org.jose4j
+
+
+ com.github
+ artemis.kube.shade.com.github
+
+
+ io.github
+ artemis.kube.shade.io.github
+
+
+ org.jspecify
+ artemis.kube.shade.org.jspecify
+
+
+ org.bouncycastle
+ artemis.kube.shade.org.bouncycastle
+
+
+
+
+
+
+
+
+ *:*
+
+ META-INF/*.SF
+ META-INF/*.DSA
+ META-INF/*.RSA
+
+
+
+
+
+
+
+
+
+
+
diff --git a/artemis-lockmanager/artemis-kube-lock/pom.xml b/artemis-lockmanager/artemis-kube-lock/pom.xml
new file mode 100644
index 00000000000..52ac7ccb962
--- /dev/null
+++ b/artemis-lockmanager/artemis-kube-lock/pom.xml
@@ -0,0 +1,63 @@
+
+
+ 4.0.0
+
+
+ org.apache.artemis
+ artemis-lockmanager
+ 2.54.0-SNAPSHOT
+
+
+ artemis-kube-lock
+ bundle
+ Kubernetes Lock Manager
+
+
+ ${project.basedir}/../..
+
+
+
+
+ org.apache.artemis
+ artemis-lockmanager-api
+
+
+
+ io.kubernetes
+ client-java
+ ${kubernetes.client.version}
+
+
+
+
+ io.kubernetes
+ client-java-extended
+ ${kubernetes.client.version}
+
+
+
+
+ org.bouncycastle
+ bcpkix-jdk18on
+ 1.84
+ compile
+
+
+
+
+
diff --git a/artemis-lockmanager/artemis-kube-lock/src/main/java/org/apache/artemis/lock/kube/KubeLock.java b/artemis-lockmanager/artemis-kube-lock/src/main/java/org/apache/artemis/lock/kube/KubeLock.java
new file mode 100644
index 00000000000..ae0f1340689
--- /dev/null
+++ b/artemis-lockmanager/artemis-kube-lock/src/main/java/org/apache/artemis/lock/kube/KubeLock.java
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.artemis.lock.kube;
+
+import java.io.FileReader;
+import java.lang.invoke.MethodHandles;
+import java.time.Duration;
+import java.time.OffsetDateTime;
+
+import io.kubernetes.client.openapi.ApiClient;
+import io.kubernetes.client.openapi.ApiException;
+import io.kubernetes.client.openapi.apis.CoordinationV1Api;
+import io.kubernetes.client.openapi.models.V1Lease;
+import io.kubernetes.client.openapi.models.V1LeaseSpec;
+import io.kubernetes.client.openapi.models.V1ObjectMeta;
+import io.kubernetes.client.util.ClientBuilder;
+import io.kubernetes.client.util.KubeConfig;
+import org.apache.activemq.artemis.lockmanager.DistributedLock;
+import org.apache.activemq.artemis.lockmanager.UnavailableStateException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class KubeLock implements DistributedLock {
+
+ private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ final String hostname;
+ final String namespace;
+ final String id;
+ private final long leasePeriodSeconds;
+
+ private ApiClient kubeClient;
+
+ private CoordinationV1Api coordinationV1Api;
+
+
+ public KubeLock(String hostname, String namespace, String id, long leasePeriodSeconds) throws Exception {
+ this.hostname = hostname;
+ this.namespace = namespace;
+ this.id = id;
+ String kubeconfigPath = System.getenv("KUBECONFIG");
+ if (kubeconfigPath == null) {
+ kubeconfigPath = System.getProperty("user.home") + "/.kube/config";
+ }
+ logger.debug("using kubeLock client as {}", kubeconfigPath);
+ kubeClient = ClientBuilder.kubeconfig(KubeConfig.loadKubeConfig(new FileReader(kubeconfigPath))).build();
+ coordinationV1Api = new CoordinationV1Api(kubeClient);
+ this.leasePeriodSeconds = leasePeriodSeconds;
+ }
+
+ @Override
+ public String getLockId() {
+ return id;
+ }
+
+ @Override
+ public boolean isHeldByCaller() throws UnavailableStateException {
+ return renewLock();
+ }
+
+ private boolean renewLock() throws UnavailableStateException {
+ try {
+ // Try to read the existing lease
+ V1Lease existingLease = coordinationV1Api.readNamespacedLease(id, namespace).execute();
+ logger.debug("renewLock, Read lease: renewTime={}, holderIdentity={}, leaseDuration={}",
+ existingLease.getSpec().getRenewTime(),
+ existingLease.getSpec().getHolderIdentity(),
+ existingLease.getSpec().getLeaseDurationSeconds());
+
+ // Check if we already hold this lease
+ if (hostname.equals(existingLease.getSpec().getHolderIdentity())) {
+ // Renew the lease
+ existingLease.getSpec().setRenewTime(java.time.OffsetDateTime.now(java.time.ZoneOffset.UTC));
+ coordinationV1Api.replaceNamespacedLease(id, namespace, existingLease).execute();
+ return true;
+ }
+
+
+ // Check if the lease has expired by using the leaseDurationSeconds from the lease spec
+ // This is more reliable than using our local leasePeriodSeconds
+ java.time.OffsetDateTime renewTime = existingLease.getSpec().getRenewTime();
+ Integer leaseDuration = existingLease.getSpec().getLeaseDurationSeconds();
+
+ if (renewTime != null && leaseDuration != null) {
+ OffsetDateTime now = java.time.OffsetDateTime.now(java.time.ZoneOffset.UTC);
+
+ if (logger.isDebugEnabled()) {
+ logger.debug("renew period:: {}, now = {}, between={}, between seconds={}", renewTime, now, Duration.between(renewTime, now), Duration.between(renewTime, now).toSeconds());
+ }
+ long ageSeconds = java.time.Duration.between(renewTime, now).toSeconds();
+
+ if (ageSeconds > leaseDuration) {
+ // Lease has expired, try to acquire it
+ OffsetDateTime newRenewTime = java.time.OffsetDateTime.now(java.time.ZoneOffset.UTC);
+ existingLease.getSpec().setHolderIdentity(hostname);
+ existingLease.getSpec().setAcquireTime(java.time.OffsetDateTime.now(java.time.ZoneOffset.UTC));
+ existingLease.getSpec().setRenewTime(newRenewTime);
+
+ logger.debug("acquiring expired lease. Setting renewTime = {}, holderIdentity = {}, leaseDuration = {}", newRenewTime, hostname, leasePeriodSeconds);
+
+ existingLease.getSpec().setLeaseDurationSeconds((int) (leasePeriodSeconds));
+ coordinationV1Api.replaceNamespacedLease(id, namespace, existingLease).execute();
+ return true;
+ }
+ }
+
+ // Lease is held by someone else and not expired
+ return false;
+
+ } catch (ApiException e) {
+ if (e.getCode() == 404) {
+ logger.debug("Create lock");
+ return createLock();
+ } else {
+ logger.warn(e.getMessage(), e);
+ return false;
+ }
+ }
+ }
+
+ @Override
+ public boolean tryLock() throws UnavailableStateException {
+ return renewLock();
+ }
+
+ private boolean createLock() throws UnavailableStateException {
+ // Lease doesn't exist, create a new one
+ try {
+ V1Lease newLease = new V1Lease();
+ newLease.setApiVersion("coordination.k8s.io/v1");
+ newLease.setKind("Lease");
+
+ V1ObjectMeta metadata = new V1ObjectMeta();
+ metadata.setName(id);
+ metadata.setNamespace(namespace);
+ newLease.setMetadata(metadata);
+
+ V1LeaseSpec spec = new V1LeaseSpec();
+ spec.setHolderIdentity(hostname);
+ spec.setAcquireTime(java.time.OffsetDateTime.now(java.time.ZoneOffset.UTC));
+
+ spec.setRenewTime(java.time.OffsetDateTime.now(java.time.ZoneOffset.UTC));
+ spec.setLeaseDurationSeconds((int) (leasePeriodSeconds));
+ newLease.setSpec(spec);
+
+ coordinationV1Api.createNamespacedLease(namespace, newLease).execute();
+ return true;
+ } catch (ApiException createException) {
+ createException.printStackTrace();
+ // Race condition - someone else created it first
+ logger.warn(createException.getMessage(), createException);
+ return false;
+ }
+ }
+
+ @Override
+ public void unlock() throws UnavailableStateException {
+ try {
+ // Try to read the existing lease
+ V1Lease existingLease = coordinationV1Api.readNamespacedLease(id, namespace).execute();
+
+ // Only unlock if we hold the lease
+ if (hostname.equals(existingLease.getSpec().getHolderIdentity())) {
+ // Delete the lease to release the lock
+ coordinationV1Api.deleteNamespacedLease(id, namespace).execute();
+ logger.debug("Released lock: {}", id);
+ } else {
+ logger.warn("Attempted to unlock {} but it's held by {}", id, existingLease.getSpec().getHolderIdentity());
+ }
+ } catch (ApiException e) {
+ if (e.getCode() == 404) {
+ // Lease doesn't exist - already unlocked or expired
+ logger.debug("Lock {} not found, already released", id);
+ } else {
+ throw new UnavailableStateException("Failed to unlock: " + e.getResponseBody(), e);
+ }
+ }
+ }
+
+ @Override
+ public void addListener(UnavailableLockListener listener) {
+
+ }
+
+ @Override
+ public void removeListener(UnavailableLockListener listener) {
+
+ }
+
+ @Override
+ public void close() {
+
+ }
+}
diff --git a/artemis-lockmanager/artemis-kube-lock/src/main/java/org/apache/artemis/lock/kube/KubeLockManager.java b/artemis-lockmanager/artemis-kube-lock/src/main/java/org/apache/artemis/lock/kube/KubeLockManager.java
new file mode 100644
index 00000000000..aedf172fd6c
--- /dev/null
+++ b/artemis-lockmanager/artemis-kube-lock/src/main/java/org/apache/artemis/lock/kube/KubeLockManager.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.artemis.lock.kube;
+
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import org.apache.activemq.artemis.lockmanager.AbstractDistributedLockManager;
+import org.apache.activemq.artemis.lockmanager.DistributedLock;
+import org.apache.activemq.artemis.lockmanager.MutableLong;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static java.util.stream.Collectors.joining;
+
+/**
+ * Kubernetes-based distributed lock manager implementation.
+ *
+ * Configuration parameters:
+ *
+ * - hostname: The hostname identifier for this instance.
+ * Default: value of the HOSTNAME environment variable
+ * - namespace: The Kubernetes namespace where locks are managed.
+ * Default: reads from /var/run/secrets/kubernetes.io/serviceaccount/namespace,
+ * falls back to "default" if unavailable
+ * - lease-timeout: The lease timeout in seconds.
+ * Default: 30 seconds
+ *
+ */
+public class KubeLockManager extends AbstractDistributedLockManager {
+
+ private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ private static final String HOSTNAME = "hostname";
+ private static final String NAMESPACE = "namespace";
+ private static final String LEASE_TIMEOUT = "lease-timeout";
+ private static final Set VALID_PARAMS = Stream.of(
+ HOSTNAME,
+ NAMESPACE, LEASE_TIMEOUT).collect(Collectors.toSet());
+ private static final String VALID_PARAMS_ON_ERROR = VALID_PARAMS.stream().collect(joining(","));
+
+ private String hostname;
+ private String namespace;
+ long leaseTimeout;
+
+
+
+ public KubeLockManager(Map config) {
+ this(config.get(HOSTNAME),
+ config.get(NAMESPACE),
+ Long.parseLong(config.get(LEASE_TIMEOUT)));
+ validateParameters(config);
+ }
+
+ public KubeLockManager(String hostname, String namespace, long leaseTimeout) {
+ this.hostname = hostname;
+ this.namespace = namespace;
+ this.leaseTimeout = leaseTimeout;
+
+ if (hostname == null) {
+ hostname = System.getenv("HOSTNAME");
+ logger.debug("Replaced hostname attribute as {}", hostname);
+ }
+
+ if (namespace == null) {
+ try {
+ namespace = Files.readString(Path.of("/var/run/secrets/kubernetes.io/serviceaccount/namespace")).trim();
+ logger.debug("Read namespace from Kubernetes service account: {}", namespace);
+ } catch (IOException e) {
+ logger.warn(e.getMessage(), e);
+ namespace = "default";
+ }
+ }
+
+ if (leaseTimeout <= 0) {
+ leaseTimeout = 30;
+ }
+ }
+
+
+ @Override
+ protected Set getValidParams() {
+ return VALID_PARAMS;
+ }
+
+ @Override
+ public void addUnavailableManagerListener(UnavailableManagerListener listener) {
+
+ }
+
+ @Override
+ public void removeUnavailableManagerListener(UnavailableManagerListener listener) {
+
+ }
+
+ @Override
+ public boolean start(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException {
+ return true;
+ }
+
+ @Override
+ public void start() throws InterruptedException, ExecutionException {
+ }
+
+ @Override
+ public boolean isStarted() {
+ return true;
+ }
+
+ @Override
+ public void stop() {
+
+ }
+
+ @Override
+ public DistributedLock getDistributedLock(String lockId) throws InterruptedException, ExecutionException, TimeoutException {
+ try {
+ return new KubeLock(hostname, namespace, lockId, leaseTimeout);
+ } catch (Exception e) {
+ throw new ExecutionException(e.getMessage(), e);
+ }
+
+ }
+
+ @Override
+ public MutableLong getMutableLong(String mutableLongId) throws InterruptedException, ExecutionException, TimeoutException {
+ try {
+ return new KubeMutableLong(namespace, mutableLongId);
+ } catch (Exception e) {
+ throw new ExecutionException(e.getMessage(), e);
+ }
+ }
+}
diff --git a/artemis-lockmanager/artemis-kube-lock/src/main/java/org/apache/artemis/lock/kube/KubeMutableLong.java b/artemis-lockmanager/artemis-kube-lock/src/main/java/org/apache/artemis/lock/kube/KubeMutableLong.java
new file mode 100644
index 00000000000..993faf61322
--- /dev/null
+++ b/artemis-lockmanager/artemis-kube-lock/src/main/java/org/apache/artemis/lock/kube/KubeMutableLong.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.artemis.lock.kube;
+
+import java.io.FileReader;
+import java.lang.invoke.MethodHandles;
+import java.util.HashMap;
+import java.util.Map;
+
+import io.kubernetes.client.openapi.ApiClient;
+import io.kubernetes.client.openapi.ApiException;
+import io.kubernetes.client.openapi.apis.CoreV1Api;
+import io.kubernetes.client.openapi.models.V1ConfigMap;
+import io.kubernetes.client.openapi.models.V1ObjectMeta;
+import io.kubernetes.client.util.ClientBuilder;
+import io.kubernetes.client.util.KubeConfig;
+import org.apache.activemq.artemis.lockmanager.MutableLong;
+import org.apache.activemq.artemis.lockmanager.UnavailableStateException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class KubeMutableLong implements MutableLong {
+
+ private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+ private static final String VALUE_KEY = "value";
+
+ private final String namespace;
+ private final String id;
+ private final String configMapName;
+ private final ApiClient kubeClient;
+ private final CoreV1Api coreV1Api;
+
+ public KubeMutableLong(String namespace, String id) throws Exception {
+ this.namespace = namespace;
+ this.id = id;
+ this.configMapName = sanitizeConfigMapName(id);
+ String kubeconfigPath = System.getenv("KUBECONFIG");
+ if (kubeconfigPath == null) {
+ kubeconfigPath = System.getProperty("user.home") + "/.kube/config";
+ }
+ kubeClient = ClientBuilder.kubeconfig(KubeConfig.loadKubeConfig(new FileReader(kubeconfigPath))).build();
+ coreV1Api = new CoreV1Api(kubeClient);
+ }
+
+ private static String sanitizeConfigMapName(String name) {
+ return name.toLowerCase().replaceAll("[^a-z0-9.-]", "-");
+ }
+
+ @Override
+ public String getMutableLongId() {
+ return id;
+ }
+
+ @Override
+ public long get() throws UnavailableStateException {
+ try {
+ V1ConfigMap configMap = coreV1Api.readNamespacedConfigMap(configMapName, namespace).execute();
+ Map data = configMap.getData();
+ if (data == null || !data.containsKey(VALUE_KEY)) {
+ return 0L;
+ }
+ return Long.parseLong(data.get(VALUE_KEY));
+ } catch (ApiException e) {
+ if (e.getCode() == 404) {
+ return 0L;
+ }
+ throw new UnavailableStateException("Failed to get mutable long value: " + e.getResponseBody(), e);
+ } catch (NumberFormatException e) {
+ throw new UnavailableStateException("Invalid long value stored in ConfigMap", e);
+ }
+ }
+
+ @Override
+ public void set(long value) throws UnavailableStateException {
+ try {
+ V1ConfigMap existingConfigMap;
+ boolean exists = true;
+
+ try {
+ existingConfigMap = coreV1Api.readNamespacedConfigMap(configMapName, namespace).execute();
+ } catch (ApiException e) {
+ if (e.getCode() == 404) {
+ exists = false;
+ existingConfigMap = null;
+ } else {
+ throw e;
+ }
+ }
+
+ if (exists) {
+ Map data = existingConfigMap.getData();
+ if (data == null) {
+ data = new HashMap<>();
+ existingConfigMap.setData(data);
+ }
+ data.put(VALUE_KEY, String.valueOf(value));
+ coreV1Api.replaceNamespacedConfigMap(configMapName, namespace, existingConfigMap).execute();
+ } else {
+ V1ConfigMap newConfigMap = new V1ConfigMap();
+ newConfigMap.setApiVersion("v1");
+ newConfigMap.setKind("ConfigMap");
+
+ V1ObjectMeta metadata = new V1ObjectMeta();
+ metadata.setName(configMapName);
+ metadata.setNamespace(namespace);
+ newConfigMap.setMetadata(metadata);
+
+ Map data = new HashMap<>();
+ data.put(VALUE_KEY, String.valueOf(value));
+ newConfigMap.setData(data);
+
+ coreV1Api.createNamespacedConfigMap(namespace, newConfigMap).execute();
+ }
+ } catch (ApiException e) {
+ throw new UnavailableStateException("Failed to set mutable long value: " + e.getResponseBody(), e);
+ }
+ }
+
+ @Override
+ public void close() {
+ // No cleanup needed for now
+ }
+}
diff --git a/artemis-lockmanager/artemis-lockmanager-api/pom.xml b/artemis-lockmanager/artemis-lockmanager-api/pom.xml
index 874449be2bb..f8c9a45b101 100644
--- a/artemis-lockmanager/artemis-lockmanager-api/pom.xml
+++ b/artemis-lockmanager/artemis-lockmanager-api/pom.xml
@@ -31,11 +31,4 @@
${project.basedir}/../..
-
-
- org.apache.artemis
- artemis-commons
-
-
-
diff --git a/artemis-lockmanager/artemis-lockmanager-api/src/main/java/org/apache/activemq/artemis/lockmanager/AbstractDistributedLockManager.java b/artemis-lockmanager/artemis-lockmanager-api/src/main/java/org/apache/activemq/artemis/lockmanager/AbstractDistributedLockManager.java
new file mode 100644
index 00000000000..2204b10e500
--- /dev/null
+++ b/artemis-lockmanager/artemis-lockmanager-api/src/main/java/org/apache/activemq/artemis/lockmanager/AbstractDistributedLockManager.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.lockmanager;
+
+import java.util.Map;
+import java.util.Set;
+
+import static java.util.stream.Collectors.joining;
+
+public abstract class AbstractDistributedLockManager implements DistributedLockManager {
+
+ public AbstractDistributedLockManager() {
+ }
+
+ public AbstractDistributedLockManager(Map properties) {
+ validateParameters(properties);
+ }
+
+ protected String commaOnParameters() {
+ return getValidParams().stream().collect(joining(","));
+ }
+
+
+ protected void validateParameters(Map config) {
+ config.forEach((parameterName, ignore) -> validateParameter(parameterName));
+ }
+
+ protected abstract Set getValidParams();
+
+ protected void validateParameter(String parameterName) {
+ if (!getValidParams().contains(parameterName)) {
+ throw new IllegalArgumentException("non existent parameter " + parameterName + ": accepted list is " + commaOnParameters());
+ }
+ }
+
+
+
+}
diff --git a/artemis-lockmanager/artemis-lockmanager-api/src/main/java/org/apache/activemq/artemis/lockmanager/DistributedLockManager.java b/artemis-lockmanager/artemis-lockmanager-api/src/main/java/org/apache/activemq/artemis/lockmanager/DistributedLockManager.java
index d42f8e985fa..71b7132099d 100644
--- a/artemis-lockmanager/artemis-lockmanager-api/src/main/java/org/apache/activemq/artemis/lockmanager/DistributedLockManager.java
+++ b/artemis-lockmanager/artemis-lockmanager-api/src/main/java/org/apache/activemq/artemis/lockmanager/DistributedLockManager.java
@@ -16,21 +16,20 @@
*/
package org.apache.activemq.artemis.lockmanager;
+import java.lang.reflect.InvocationTargetException;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
-import org.apache.activemq.artemis.utils.ClassloadingUtil;
-
public interface DistributedLockManager extends AutoCloseable {
static DistributedLockManager newInstanceOf(String className, Map properties) throws Exception {
- return (DistributedLockManager) ClassloadingUtil.getInstanceForParamsWithTypeCheck(className,
- DistributedLockManager.class,
- DistributedLockManager.class.getClassLoader(),
- new Class[]{Map.class},
- properties);
+ return (DistributedLockManager) getInstanceForParamsWithTypeCheck(className,
+ DistributedLockManager.class,
+ DistributedLockManager.class.getClassLoader(),
+ new Class[]{Map.class},
+ properties);
}
@FunctionalInterface
@@ -59,4 +58,24 @@ interface UnavailableManagerListener {
default void close() {
stop();
}
+
+ // This is copied from ClassLoadingUtils..
+ // I need to cut the dependency here to make it easier for external modules to implement the API here.
+ private static Object getInstanceForParamsWithTypeCheck(String className,
+ Class> expectedType,
+ ClassLoader loader, Class>[] parameterTypes, Object... params) throws ClassNotFoundException, InstantiationException, IllegalAccessException, InvocationTargetException, NoSuchMethodException {
+ final Class> clazz = loadWithCheck(className, expectedType, loader);
+ return clazz.getDeclaredConstructor(parameterTypes).newInstance(params);
+ }
+
+ private static Class> loadWithCheck(String className,
+ Class> expectedType,
+ ClassLoader loader) throws ClassNotFoundException {
+ Class> clazz = loader.loadClass(className);
+ if (!expectedType.isAssignableFrom(clazz)) {
+ throw new IllegalStateException("clazz [" + className + "] is not assignable from expected type: " + expectedType);
+ }
+ return clazz;
+ }
+
}
\ No newline at end of file
diff --git a/artemis-lockmanager/artemis-lockmanager-ri/src/main/java/org/apache/activemq/artemis/lockmanager/file/FileBasedLockManager.java b/artemis-lockmanager/artemis-lockmanager-ri/src/main/java/org/apache/activemq/artemis/lockmanager/file/FileBasedLockManager.java
index 3329b88bf79..a47e9e7cd8b 100644
--- a/artemis-lockmanager/artemis-lockmanager-ri/src/main/java/org/apache/activemq/artemis/lockmanager/file/FileBasedLockManager.java
+++ b/artemis-lockmanager/artemis-lockmanager-ri/src/main/java/org/apache/activemq/artemis/lockmanager/file/FileBasedLockManager.java
@@ -23,11 +23,14 @@
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
+import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.apache.activemq.artemis.lockmanager.AbstractDistributedLockManager;
import org.apache.activemq.artemis.lockmanager.DistributedLock;
-import org.apache.activemq.artemis.lockmanager.DistributedLockManager;
import org.apache.activemq.artemis.lockmanager.MutableLong;
import org.apache.activemq.artemis.lockmanager.UnavailableStateException;
@@ -42,14 +45,25 @@
* The directory must be created in advance before using this lock manager.
*
*/
-public class FileBasedLockManager implements DistributedLockManager {
+public class FileBasedLockManager extends AbstractDistributedLockManager {
private final File locksFolder;
private final Map locks;
private boolean started;
+ private static final String LOCKS_FOLDER = "locks-folder";
+ private static final Set VALID_PARAMS = Stream.of(
+ LOCKS_FOLDER).collect(Collectors.toSet());
+
+ @Override
+ protected Set getValidParams() {
+ return VALID_PARAMS;
+ }
+
public FileBasedLockManager(Map args) {
this(new File(args.get("locks-folder")));
+
+ validateParameters(args);
}
public FileBasedLockManager(File locksFolder) {
diff --git a/artemis-lockmanager/artemis-lockmanager-ri/src/main/java/org/apache/activemq/artemis/lockmanager/zookeeper/CuratorDistributedLockManager.java b/artemis-lockmanager/artemis-lockmanager-ri/src/main/java/org/apache/activemq/artemis/lockmanager/zookeeper/CuratorDistributedLockManager.java
index e6aa6689ca2..cb8ba223390 100644
--- a/artemis-lockmanager/artemis-lockmanager-ri/src/main/java/org/apache/activemq/artemis/lockmanager/zookeeper/CuratorDistributedLockManager.java
+++ b/artemis-lockmanager/artemis-lockmanager-ri/src/main/java/org/apache/activemq/artemis/lockmanager/zookeeper/CuratorDistributedLockManager.java
@@ -28,8 +28,8 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;
+import org.apache.activemq.artemis.lockmanager.AbstractDistributedLockManager;
import org.apache.activemq.artemis.lockmanager.DistributedLock;
-import org.apache.activemq.artemis.lockmanager.DistributedLockManager;
import org.apache.activemq.artemis.lockmanager.MutableLong;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
@@ -42,7 +42,6 @@
import org.apache.curator.utils.DebugUtils;
import static java.util.Objects.requireNonNull;
-import static java.util.stream.Collectors.joining;
/**
* ZooKeeper-based distributed lock manager using Apache Curator.
@@ -58,7 +57,7 @@
* retries-ms (optional, default: 1000): Delay in milliseconds between retry attempts
*
*/
-public class CuratorDistributedLockManager implements DistributedLockManager, ConnectionStateListener {
+public class CuratorDistributedLockManager extends AbstractDistributedLockManager implements ConnectionStateListener {
enum PrimitiveType {
lock, mutableLong;
@@ -111,6 +110,11 @@ public int hashCode() {
}
}
+ @Override
+ protected Set getValidParams() {
+ return VALID_PARAMS;
+ }
+
private static final String CONNECT_STRING_PARAM = "connect-string";
private static final String NAMESPACE_PARAM = "namespace";
private static final String SESSION_MS_PARAM = "session-ms";
@@ -126,7 +130,6 @@ public int hashCode() {
CONNECTION_MS_PARAM,
RETRIES_PARAM,
RETRIES_MS_PARAM).collect(Collectors.toSet());
- private static final String VALID_PARAMS_ON_ERROR = VALID_PARAMS.stream().collect(joining(","));
// It's 9 times the default ZK tick time ie 2000 ms
private static final String DEFAULT_SESSION_TIMEOUT_MS = Integer.toString(18_000);
private static final String DEFAULT_CONNECTION_TIMEOUT_MS = Integer.toString(8_000);
@@ -135,17 +138,6 @@ public int hashCode() {
// why 1/3 of the session? https://cwiki.apache.org/confluence/display/CURATOR/TN14
private static final String DEFAULT_SESSION_PERCENT = Integer.toString(33);
- private static Map validateParameters(Map config) {
- config.forEach((parameterName, ignore) -> validateParameter(parameterName));
- return config;
- }
-
- private static void validateParameter(String parameterName) {
- if (!VALID_PARAMS.contains(parameterName)) {
- throw new IllegalArgumentException("non existent parameter " + parameterName + ": accepted list is " + VALID_PARAMS_ON_ERROR);
- }
- }
-
private CuratorFramework client;
private final Map primitives;
private List listeners;
@@ -161,10 +153,6 @@ private static void validateParameter(String parameterName) {
}
public CuratorDistributedLockManager(Map config) {
- this(validateParameters(config), true);
- }
-
- private CuratorDistributedLockManager(Map config, boolean ignore) {
this(config.get(CONNECT_STRING_PARAM),
config.get(NAMESPACE_PARAM),
Integer.parseInt(config.getOrDefault(SESSION_MS_PARAM, DEFAULT_SESSION_TIMEOUT_MS)),
@@ -172,6 +160,7 @@ private CuratorDistributedLockManager(Map config, boolean ignore
Integer.parseInt(config.getOrDefault(CONNECTION_MS_PARAM, DEFAULT_CONNECTION_TIMEOUT_MS)),
Integer.parseInt(config.getOrDefault(RETRIES_PARAM, DEFAULT_RETRIES)),
Integer.parseInt(config.getOrDefault(RETRIES_MS_PARAM, DEFAULT_RETRIES_MS)));
+ validateParameters(config);
}
private CuratorDistributedLockManager(String connectString,
diff --git a/artemis-lockmanager/pom.xml b/artemis-lockmanager/pom.xml
index d913fcd4f67..25b0bd83429 100644
--- a/artemis-lockmanager/pom.xml
+++ b/artemis-lockmanager/pom.xml
@@ -32,6 +32,8 @@
artemis-lockmanager-api
artemis-lockmanager-ri
+ artemis-kube-lock
+ artemis-kube-lock-all
diff --git a/artemis-pom/pom.xml b/artemis-pom/pom.xml
index fb774f29c1b..a94c9f30697 100644
--- a/artemis-pom/pom.xml
+++ b/artemis-pom/pom.xml
@@ -951,6 +951,20 @@
+
+