diff --git a/artemis-bom/pom.xml b/artemis-bom/pom.xml index 3dabf92fce1..4bf4bae9c94 100644 --- a/artemis-bom/pom.xml +++ b/artemis-bom/pom.xml @@ -393,6 +393,16 @@ artemis-lockmanager-ri ${project.version} + + org.apache.activemq + artemis-kube-lock + ${project.version} + + + org.apache.activemq + artemis-kube-lock-all + ${project.version} + org.apache.activemq artemis-ra diff --git a/artemis-distribution/pom.xml b/artemis-distribution/pom.xml index 0536ecea1cb..6d205c69702 100644 --- a/artemis-distribution/pom.xml +++ b/artemis-distribution/pom.xml @@ -126,6 +126,11 @@ org.apache.artemis artemis-lockmanager-ri + + org.apache.artemis + artemis-kube-lock-all + ${project.version} + diff --git a/artemis-lockmanager/artemis-kube-lock-all/pom.xml b/artemis-lockmanager/artemis-kube-lock-all/pom.xml new file mode 100644 index 00000000000..0f3cbfcc5c0 --- /dev/null +++ b/artemis-lockmanager/artemis-kube-lock-all/pom.xml @@ -0,0 +1,209 @@ + + + 4.0.0 + + + org.apache.artemis + artemis-lockmanager + 2.54.0-SNAPSHOT + + + artemis-kube-lock-all + jar + Kube Lock Manager (all dependencies) + + + ${project.basedir}/../.. + + + + + org.apache.artemis + artemis-kube-lock + ${project.version} + true + + + + + org.bouncycastle + bcpkix-jdk18on + ${bc-java-version} + compile + + + org.bouncycastle + bcprov-jdk18on + ${bc-java-version} + compile + + + + + + + org.apache.maven.plugins + maven-shade-plugin + + + package + + shade + + + true + false + false + + + *:* + + + + org.apache.artemis:artemis-lockmanager-api + + + + + + io.kubernetes + artemis.kube.shade.io.kubernetes + + + io.swagger + artemis.kube.shade.io.swagger + + + io.gapi + artemis.kube.shade.io.gapi + + + io.sundr + artemis.kube.shade.io.sundr + + + io.gsonfire + artemis.kube.shade.io.gsonfire + + + com.google + artemis.kube.shade.com.google + + + com.squareup + artemis.kube.shade.com.squareup + + + com.fasterxml + artemis.kube.shade.com.fasterxml + + + okhttp3 + artemis.kube.shade.okhttp3 + + + okio + artemis.kube.shade.okio + + + org.yaml + artemis.kube.shade.org.yaml + + + org.joda + artemis.kube.shade.org.joda + + + org.threeten + artemis.kube.shade.org.threeten + + + org.checkerframework + artemis.kube.shade.org.checkerframework + + + javax.annotation + artemis.kube.shade.javax.annotation + + + kotlin + artemis.kube.shade.kotlin + + + org.jetbrains + artemis.kube.shade.org.jetbrains + + + org.intellij + artemis.kube.shade.org.intellij + + + org.apache.commons + artemis.kube.shade.org.apache.commons + + + org.slf4j + artemis.kube.shade.org.slf4j + + + jakarta + artemis.kube.shade.jakarta + + + org.jose4j + artemis.kube.shade.org.jose4j + + + com.github + artemis.kube.shade.com.github + + + io.github + artemis.kube.shade.io.github + + + org.jspecify + artemis.kube.shade.org.jspecify + + + org.bouncycastle + artemis.kube.shade.org.bouncycastle + + + + + + + + + *:* + + META-INF/*.SF + META-INF/*.DSA + META-INF/*.RSA + + + + + + + + + + + diff --git a/artemis-lockmanager/artemis-kube-lock/pom.xml b/artemis-lockmanager/artemis-kube-lock/pom.xml new file mode 100644 index 00000000000..52ac7ccb962 --- /dev/null +++ b/artemis-lockmanager/artemis-kube-lock/pom.xml @@ -0,0 +1,63 @@ + + + 4.0.0 + + + org.apache.artemis + artemis-lockmanager + 2.54.0-SNAPSHOT + + + artemis-kube-lock + bundle + Kubernetes Lock Manager + + + ${project.basedir}/../.. + + + + + org.apache.artemis + artemis-lockmanager-api + + + + io.kubernetes + client-java + ${kubernetes.client.version} + + + + + io.kubernetes + client-java-extended + ${kubernetes.client.version} + + + + + org.bouncycastle + bcpkix-jdk18on + 1.84 + compile + + + + + diff --git a/artemis-lockmanager/artemis-kube-lock/src/main/java/org/apache/artemis/lock/kube/KubeLock.java b/artemis-lockmanager/artemis-kube-lock/src/main/java/org/apache/artemis/lock/kube/KubeLock.java new file mode 100644 index 00000000000..ae0f1340689 --- /dev/null +++ b/artemis-lockmanager/artemis-kube-lock/src/main/java/org/apache/artemis/lock/kube/KubeLock.java @@ -0,0 +1,209 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.artemis.lock.kube; + +import java.io.FileReader; +import java.lang.invoke.MethodHandles; +import java.time.Duration; +import java.time.OffsetDateTime; + +import io.kubernetes.client.openapi.ApiClient; +import io.kubernetes.client.openapi.ApiException; +import io.kubernetes.client.openapi.apis.CoordinationV1Api; +import io.kubernetes.client.openapi.models.V1Lease; +import io.kubernetes.client.openapi.models.V1LeaseSpec; +import io.kubernetes.client.openapi.models.V1ObjectMeta; +import io.kubernetes.client.util.ClientBuilder; +import io.kubernetes.client.util.KubeConfig; +import org.apache.activemq.artemis.lockmanager.DistributedLock; +import org.apache.activemq.artemis.lockmanager.UnavailableStateException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class KubeLock implements DistributedLock { + + private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + final String hostname; + final String namespace; + final String id; + private final long leasePeriodSeconds; + + private ApiClient kubeClient; + + private CoordinationV1Api coordinationV1Api; + + + public KubeLock(String hostname, String namespace, String id, long leasePeriodSeconds) throws Exception { + this.hostname = hostname; + this.namespace = namespace; + this.id = id; + String kubeconfigPath = System.getenv("KUBECONFIG"); + if (kubeconfigPath == null) { + kubeconfigPath = System.getProperty("user.home") + "/.kube/config"; + } + logger.debug("using kubeLock client as {}", kubeconfigPath); + kubeClient = ClientBuilder.kubeconfig(KubeConfig.loadKubeConfig(new FileReader(kubeconfigPath))).build(); + coordinationV1Api = new CoordinationV1Api(kubeClient); + this.leasePeriodSeconds = leasePeriodSeconds; + } + + @Override + public String getLockId() { + return id; + } + + @Override + public boolean isHeldByCaller() throws UnavailableStateException { + return renewLock(); + } + + private boolean renewLock() throws UnavailableStateException { + try { + // Try to read the existing lease + V1Lease existingLease = coordinationV1Api.readNamespacedLease(id, namespace).execute(); + logger.debug("renewLock, Read lease: renewTime={}, holderIdentity={}, leaseDuration={}", + existingLease.getSpec().getRenewTime(), + existingLease.getSpec().getHolderIdentity(), + existingLease.getSpec().getLeaseDurationSeconds()); + + // Check if we already hold this lease + if (hostname.equals(existingLease.getSpec().getHolderIdentity())) { + // Renew the lease + existingLease.getSpec().setRenewTime(java.time.OffsetDateTime.now(java.time.ZoneOffset.UTC)); + coordinationV1Api.replaceNamespacedLease(id, namespace, existingLease).execute(); + return true; + } + + + // Check if the lease has expired by using the leaseDurationSeconds from the lease spec + // This is more reliable than using our local leasePeriodSeconds + java.time.OffsetDateTime renewTime = existingLease.getSpec().getRenewTime(); + Integer leaseDuration = existingLease.getSpec().getLeaseDurationSeconds(); + + if (renewTime != null && leaseDuration != null) { + OffsetDateTime now = java.time.OffsetDateTime.now(java.time.ZoneOffset.UTC); + + if (logger.isDebugEnabled()) { + logger.debug("renew period:: {}, now = {}, between={}, between seconds={}", renewTime, now, Duration.between(renewTime, now), Duration.between(renewTime, now).toSeconds()); + } + long ageSeconds = java.time.Duration.between(renewTime, now).toSeconds(); + + if (ageSeconds > leaseDuration) { + // Lease has expired, try to acquire it + OffsetDateTime newRenewTime = java.time.OffsetDateTime.now(java.time.ZoneOffset.UTC); + existingLease.getSpec().setHolderIdentity(hostname); + existingLease.getSpec().setAcquireTime(java.time.OffsetDateTime.now(java.time.ZoneOffset.UTC)); + existingLease.getSpec().setRenewTime(newRenewTime); + + logger.debug("acquiring expired lease. Setting renewTime = {}, holderIdentity = {}, leaseDuration = {}", newRenewTime, hostname, leasePeriodSeconds); + + existingLease.getSpec().setLeaseDurationSeconds((int) (leasePeriodSeconds)); + coordinationV1Api.replaceNamespacedLease(id, namespace, existingLease).execute(); + return true; + } + } + + // Lease is held by someone else and not expired + return false; + + } catch (ApiException e) { + if (e.getCode() == 404) { + logger.debug("Create lock"); + return createLock(); + } else { + logger.warn(e.getMessage(), e); + return false; + } + } + } + + @Override + public boolean tryLock() throws UnavailableStateException { + return renewLock(); + } + + private boolean createLock() throws UnavailableStateException { + // Lease doesn't exist, create a new one + try { + V1Lease newLease = new V1Lease(); + newLease.setApiVersion("coordination.k8s.io/v1"); + newLease.setKind("Lease"); + + V1ObjectMeta metadata = new V1ObjectMeta(); + metadata.setName(id); + metadata.setNamespace(namespace); + newLease.setMetadata(metadata); + + V1LeaseSpec spec = new V1LeaseSpec(); + spec.setHolderIdentity(hostname); + spec.setAcquireTime(java.time.OffsetDateTime.now(java.time.ZoneOffset.UTC)); + + spec.setRenewTime(java.time.OffsetDateTime.now(java.time.ZoneOffset.UTC)); + spec.setLeaseDurationSeconds((int) (leasePeriodSeconds)); + newLease.setSpec(spec); + + coordinationV1Api.createNamespacedLease(namespace, newLease).execute(); + return true; + } catch (ApiException createException) { + createException.printStackTrace(); + // Race condition - someone else created it first + logger.warn(createException.getMessage(), createException); + return false; + } + } + + @Override + public void unlock() throws UnavailableStateException { + try { + // Try to read the existing lease + V1Lease existingLease = coordinationV1Api.readNamespacedLease(id, namespace).execute(); + + // Only unlock if we hold the lease + if (hostname.equals(existingLease.getSpec().getHolderIdentity())) { + // Delete the lease to release the lock + coordinationV1Api.deleteNamespacedLease(id, namespace).execute(); + logger.debug("Released lock: {}", id); + } else { + logger.warn("Attempted to unlock {} but it's held by {}", id, existingLease.getSpec().getHolderIdentity()); + } + } catch (ApiException e) { + if (e.getCode() == 404) { + // Lease doesn't exist - already unlocked or expired + logger.debug("Lock {} not found, already released", id); + } else { + throw new UnavailableStateException("Failed to unlock: " + e.getResponseBody(), e); + } + } + } + + @Override + public void addListener(UnavailableLockListener listener) { + + } + + @Override + public void removeListener(UnavailableLockListener listener) { + + } + + @Override + public void close() { + + } +} diff --git a/artemis-lockmanager/artemis-kube-lock/src/main/java/org/apache/artemis/lock/kube/KubeLockManager.java b/artemis-lockmanager/artemis-kube-lock/src/main/java/org/apache/artemis/lock/kube/KubeLockManager.java new file mode 100644 index 00000000000..aedf172fd6c --- /dev/null +++ b/artemis-lockmanager/artemis-kube-lock/src/main/java/org/apache/artemis/lock/kube/KubeLockManager.java @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.artemis.lock.kube; + +import java.io.IOException; +import java.lang.invoke.MethodHandles; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import org.apache.activemq.artemis.lockmanager.AbstractDistributedLockManager; +import org.apache.activemq.artemis.lockmanager.DistributedLock; +import org.apache.activemq.artemis.lockmanager.MutableLong; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static java.util.stream.Collectors.joining; + +/** + * Kubernetes-based distributed lock manager implementation. + *

+ * Configuration parameters: + *

+ */ +public class KubeLockManager extends AbstractDistributedLockManager { + + private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + private static final String HOSTNAME = "hostname"; + private static final String NAMESPACE = "namespace"; + private static final String LEASE_TIMEOUT = "lease-timeout"; + private static final Set VALID_PARAMS = Stream.of( + HOSTNAME, + NAMESPACE, LEASE_TIMEOUT).collect(Collectors.toSet()); + private static final String VALID_PARAMS_ON_ERROR = VALID_PARAMS.stream().collect(joining(",")); + + private String hostname; + private String namespace; + long leaseTimeout; + + + + public KubeLockManager(Map config) { + this(config.get(HOSTNAME), + config.get(NAMESPACE), + Long.parseLong(config.get(LEASE_TIMEOUT))); + validateParameters(config); + } + + public KubeLockManager(String hostname, String namespace, long leaseTimeout) { + this.hostname = hostname; + this.namespace = namespace; + this.leaseTimeout = leaseTimeout; + + if (hostname == null) { + hostname = System.getenv("HOSTNAME"); + logger.debug("Replaced hostname attribute as {}", hostname); + } + + if (namespace == null) { + try { + namespace = Files.readString(Path.of("/var/run/secrets/kubernetes.io/serviceaccount/namespace")).trim(); + logger.debug("Read namespace from Kubernetes service account: {}", namespace); + } catch (IOException e) { + logger.warn(e.getMessage(), e); + namespace = "default"; + } + } + + if (leaseTimeout <= 0) { + leaseTimeout = 30; + } + } + + + @Override + protected Set getValidParams() { + return VALID_PARAMS; + } + + @Override + public void addUnavailableManagerListener(UnavailableManagerListener listener) { + + } + + @Override + public void removeUnavailableManagerListener(UnavailableManagerListener listener) { + + } + + @Override + public boolean start(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException { + return true; + } + + @Override + public void start() throws InterruptedException, ExecutionException { + } + + @Override + public boolean isStarted() { + return true; + } + + @Override + public void stop() { + + } + + @Override + public DistributedLock getDistributedLock(String lockId) throws InterruptedException, ExecutionException, TimeoutException { + try { + return new KubeLock(hostname, namespace, lockId, leaseTimeout); + } catch (Exception e) { + throw new ExecutionException(e.getMessage(), e); + } + + } + + @Override + public MutableLong getMutableLong(String mutableLongId) throws InterruptedException, ExecutionException, TimeoutException { + try { + return new KubeMutableLong(namespace, mutableLongId); + } catch (Exception e) { + throw new ExecutionException(e.getMessage(), e); + } + } +} diff --git a/artemis-lockmanager/artemis-kube-lock/src/main/java/org/apache/artemis/lock/kube/KubeMutableLong.java b/artemis-lockmanager/artemis-kube-lock/src/main/java/org/apache/artemis/lock/kube/KubeMutableLong.java new file mode 100644 index 00000000000..993faf61322 --- /dev/null +++ b/artemis-lockmanager/artemis-kube-lock/src/main/java/org/apache/artemis/lock/kube/KubeMutableLong.java @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.artemis.lock.kube; + +import java.io.FileReader; +import java.lang.invoke.MethodHandles; +import java.util.HashMap; +import java.util.Map; + +import io.kubernetes.client.openapi.ApiClient; +import io.kubernetes.client.openapi.ApiException; +import io.kubernetes.client.openapi.apis.CoreV1Api; +import io.kubernetes.client.openapi.models.V1ConfigMap; +import io.kubernetes.client.openapi.models.V1ObjectMeta; +import io.kubernetes.client.util.ClientBuilder; +import io.kubernetes.client.util.KubeConfig; +import org.apache.activemq.artemis.lockmanager.MutableLong; +import org.apache.activemq.artemis.lockmanager.UnavailableStateException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class KubeMutableLong implements MutableLong { + + private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + private static final String VALUE_KEY = "value"; + + private final String namespace; + private final String id; + private final String configMapName; + private final ApiClient kubeClient; + private final CoreV1Api coreV1Api; + + public KubeMutableLong(String namespace, String id) throws Exception { + this.namespace = namespace; + this.id = id; + this.configMapName = sanitizeConfigMapName(id); + String kubeconfigPath = System.getenv("KUBECONFIG"); + if (kubeconfigPath == null) { + kubeconfigPath = System.getProperty("user.home") + "/.kube/config"; + } + kubeClient = ClientBuilder.kubeconfig(KubeConfig.loadKubeConfig(new FileReader(kubeconfigPath))).build(); + coreV1Api = new CoreV1Api(kubeClient); + } + + private static String sanitizeConfigMapName(String name) { + return name.toLowerCase().replaceAll("[^a-z0-9.-]", "-"); + } + + @Override + public String getMutableLongId() { + return id; + } + + @Override + public long get() throws UnavailableStateException { + try { + V1ConfigMap configMap = coreV1Api.readNamespacedConfigMap(configMapName, namespace).execute(); + Map data = configMap.getData(); + if (data == null || !data.containsKey(VALUE_KEY)) { + return 0L; + } + return Long.parseLong(data.get(VALUE_KEY)); + } catch (ApiException e) { + if (e.getCode() == 404) { + return 0L; + } + throw new UnavailableStateException("Failed to get mutable long value: " + e.getResponseBody(), e); + } catch (NumberFormatException e) { + throw new UnavailableStateException("Invalid long value stored in ConfigMap", e); + } + } + + @Override + public void set(long value) throws UnavailableStateException { + try { + V1ConfigMap existingConfigMap; + boolean exists = true; + + try { + existingConfigMap = coreV1Api.readNamespacedConfigMap(configMapName, namespace).execute(); + } catch (ApiException e) { + if (e.getCode() == 404) { + exists = false; + existingConfigMap = null; + } else { + throw e; + } + } + + if (exists) { + Map data = existingConfigMap.getData(); + if (data == null) { + data = new HashMap<>(); + existingConfigMap.setData(data); + } + data.put(VALUE_KEY, String.valueOf(value)); + coreV1Api.replaceNamespacedConfigMap(configMapName, namespace, existingConfigMap).execute(); + } else { + V1ConfigMap newConfigMap = new V1ConfigMap(); + newConfigMap.setApiVersion("v1"); + newConfigMap.setKind("ConfigMap"); + + V1ObjectMeta metadata = new V1ObjectMeta(); + metadata.setName(configMapName); + metadata.setNamespace(namespace); + newConfigMap.setMetadata(metadata); + + Map data = new HashMap<>(); + data.put(VALUE_KEY, String.valueOf(value)); + newConfigMap.setData(data); + + coreV1Api.createNamespacedConfigMap(namespace, newConfigMap).execute(); + } + } catch (ApiException e) { + throw new UnavailableStateException("Failed to set mutable long value: " + e.getResponseBody(), e); + } + } + + @Override + public void close() { + // No cleanup needed for now + } +} diff --git a/artemis-lockmanager/artemis-lockmanager-api/pom.xml b/artemis-lockmanager/artemis-lockmanager-api/pom.xml index 874449be2bb..f8c9a45b101 100644 --- a/artemis-lockmanager/artemis-lockmanager-api/pom.xml +++ b/artemis-lockmanager/artemis-lockmanager-api/pom.xml @@ -31,11 +31,4 @@ ${project.basedir}/../.. - - - org.apache.artemis - artemis-commons - - - diff --git a/artemis-lockmanager/artemis-lockmanager-api/src/main/java/org/apache/activemq/artemis/lockmanager/AbstractDistributedLockManager.java b/artemis-lockmanager/artemis-lockmanager-api/src/main/java/org/apache/activemq/artemis/lockmanager/AbstractDistributedLockManager.java new file mode 100644 index 00000000000..2204b10e500 --- /dev/null +++ b/artemis-lockmanager/artemis-lockmanager-api/src/main/java/org/apache/activemq/artemis/lockmanager/AbstractDistributedLockManager.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.lockmanager; + +import java.util.Map; +import java.util.Set; + +import static java.util.stream.Collectors.joining; + +public abstract class AbstractDistributedLockManager implements DistributedLockManager { + + public AbstractDistributedLockManager() { + } + + public AbstractDistributedLockManager(Map properties) { + validateParameters(properties); + } + + protected String commaOnParameters() { + return getValidParams().stream().collect(joining(",")); + } + + + protected void validateParameters(Map config) { + config.forEach((parameterName, ignore) -> validateParameter(parameterName)); + } + + protected abstract Set getValidParams(); + + protected void validateParameter(String parameterName) { + if (!getValidParams().contains(parameterName)) { + throw new IllegalArgumentException("non existent parameter " + parameterName + ": accepted list is " + commaOnParameters()); + } + } + + + +} diff --git a/artemis-lockmanager/artemis-lockmanager-api/src/main/java/org/apache/activemq/artemis/lockmanager/DistributedLockManager.java b/artemis-lockmanager/artemis-lockmanager-api/src/main/java/org/apache/activemq/artemis/lockmanager/DistributedLockManager.java index d42f8e985fa..71b7132099d 100644 --- a/artemis-lockmanager/artemis-lockmanager-api/src/main/java/org/apache/activemq/artemis/lockmanager/DistributedLockManager.java +++ b/artemis-lockmanager/artemis-lockmanager-api/src/main/java/org/apache/activemq/artemis/lockmanager/DistributedLockManager.java @@ -16,21 +16,20 @@ */ package org.apache.activemq.artemis.lockmanager; +import java.lang.reflect.InvocationTargetException; import java.util.Map; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -import org.apache.activemq.artemis.utils.ClassloadingUtil; - public interface DistributedLockManager extends AutoCloseable { static DistributedLockManager newInstanceOf(String className, Map properties) throws Exception { - return (DistributedLockManager) ClassloadingUtil.getInstanceForParamsWithTypeCheck(className, - DistributedLockManager.class, - DistributedLockManager.class.getClassLoader(), - new Class[]{Map.class}, - properties); + return (DistributedLockManager) getInstanceForParamsWithTypeCheck(className, + DistributedLockManager.class, + DistributedLockManager.class.getClassLoader(), + new Class[]{Map.class}, + properties); } @FunctionalInterface @@ -59,4 +58,24 @@ interface UnavailableManagerListener { default void close() { stop(); } + + // This is copied from ClassLoadingUtils.. + // I need to cut the dependency here to make it easier for external modules to implement the API here. + private static Object getInstanceForParamsWithTypeCheck(String className, + Class expectedType, + ClassLoader loader, Class[] parameterTypes, Object... params) throws ClassNotFoundException, InstantiationException, IllegalAccessException, InvocationTargetException, NoSuchMethodException { + final Class clazz = loadWithCheck(className, expectedType, loader); + return clazz.getDeclaredConstructor(parameterTypes).newInstance(params); + } + + private static Class loadWithCheck(String className, + Class expectedType, + ClassLoader loader) throws ClassNotFoundException { + Class clazz = loader.loadClass(className); + if (!expectedType.isAssignableFrom(clazz)) { + throw new IllegalStateException("clazz [" + className + "] is not assignable from expected type: " + expectedType); + } + return clazz; + } + } \ No newline at end of file diff --git a/artemis-lockmanager/artemis-lockmanager-ri/src/main/java/org/apache/activemq/artemis/lockmanager/file/FileBasedLockManager.java b/artemis-lockmanager/artemis-lockmanager-ri/src/main/java/org/apache/activemq/artemis/lockmanager/file/FileBasedLockManager.java index 3329b88bf79..a47e9e7cd8b 100644 --- a/artemis-lockmanager/artemis-lockmanager-ri/src/main/java/org/apache/activemq/artemis/lockmanager/file/FileBasedLockManager.java +++ b/artemis-lockmanager/artemis-lockmanager-ri/src/main/java/org/apache/activemq/artemis/lockmanager/file/FileBasedLockManager.java @@ -23,11 +23,14 @@ import java.util.HashMap; import java.util.Map; import java.util.Objects; +import java.util.Set; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import org.apache.activemq.artemis.lockmanager.AbstractDistributedLockManager; import org.apache.activemq.artemis.lockmanager.DistributedLock; -import org.apache.activemq.artemis.lockmanager.DistributedLockManager; import org.apache.activemq.artemis.lockmanager.MutableLong; import org.apache.activemq.artemis.lockmanager.UnavailableStateException; @@ -42,14 +45,25 @@ * The directory must be created in advance before using this lock manager. * */ -public class FileBasedLockManager implements DistributedLockManager { +public class FileBasedLockManager extends AbstractDistributedLockManager { private final File locksFolder; private final Map locks; private boolean started; + private static final String LOCKS_FOLDER = "locks-folder"; + private static final Set VALID_PARAMS = Stream.of( + LOCKS_FOLDER).collect(Collectors.toSet()); + + @Override + protected Set getValidParams() { + return VALID_PARAMS; + } + public FileBasedLockManager(Map args) { this(new File(args.get("locks-folder"))); + + validateParameters(args); } public FileBasedLockManager(File locksFolder) { diff --git a/artemis-lockmanager/artemis-lockmanager-ri/src/main/java/org/apache/activemq/artemis/lockmanager/zookeeper/CuratorDistributedLockManager.java b/artemis-lockmanager/artemis-lockmanager-ri/src/main/java/org/apache/activemq/artemis/lockmanager/zookeeper/CuratorDistributedLockManager.java index e6aa6689ca2..cb8ba223390 100644 --- a/artemis-lockmanager/artemis-lockmanager-ri/src/main/java/org/apache/activemq/artemis/lockmanager/zookeeper/CuratorDistributedLockManager.java +++ b/artemis-lockmanager/artemis-lockmanager-ri/src/main/java/org/apache/activemq/artemis/lockmanager/zookeeper/CuratorDistributedLockManager.java @@ -28,8 +28,8 @@ import java.util.stream.Collectors; import java.util.stream.Stream; +import org.apache.activemq.artemis.lockmanager.AbstractDistributedLockManager; import org.apache.activemq.artemis.lockmanager.DistributedLock; -import org.apache.activemq.artemis.lockmanager.DistributedLockManager; import org.apache.activemq.artemis.lockmanager.MutableLong; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; @@ -42,7 +42,6 @@ import org.apache.curator.utils.DebugUtils; import static java.util.Objects.requireNonNull; -import static java.util.stream.Collectors.joining; /** * ZooKeeper-based distributed lock manager using Apache Curator. @@ -58,7 +57,7 @@ *
  • retries-ms (optional, default: 1000): Delay in milliseconds between retry attempts
  • * */ -public class CuratorDistributedLockManager implements DistributedLockManager, ConnectionStateListener { +public class CuratorDistributedLockManager extends AbstractDistributedLockManager implements ConnectionStateListener { enum PrimitiveType { lock, mutableLong; @@ -111,6 +110,11 @@ public int hashCode() { } } + @Override + protected Set getValidParams() { + return VALID_PARAMS; + } + private static final String CONNECT_STRING_PARAM = "connect-string"; private static final String NAMESPACE_PARAM = "namespace"; private static final String SESSION_MS_PARAM = "session-ms"; @@ -126,7 +130,6 @@ public int hashCode() { CONNECTION_MS_PARAM, RETRIES_PARAM, RETRIES_MS_PARAM).collect(Collectors.toSet()); - private static final String VALID_PARAMS_ON_ERROR = VALID_PARAMS.stream().collect(joining(",")); // It's 9 times the default ZK tick time ie 2000 ms private static final String DEFAULT_SESSION_TIMEOUT_MS = Integer.toString(18_000); private static final String DEFAULT_CONNECTION_TIMEOUT_MS = Integer.toString(8_000); @@ -135,17 +138,6 @@ public int hashCode() { // why 1/3 of the session? https://cwiki.apache.org/confluence/display/CURATOR/TN14 private static final String DEFAULT_SESSION_PERCENT = Integer.toString(33); - private static Map validateParameters(Map config) { - config.forEach((parameterName, ignore) -> validateParameter(parameterName)); - return config; - } - - private static void validateParameter(String parameterName) { - if (!VALID_PARAMS.contains(parameterName)) { - throw new IllegalArgumentException("non existent parameter " + parameterName + ": accepted list is " + VALID_PARAMS_ON_ERROR); - } - } - private CuratorFramework client; private final Map primitives; private List listeners; @@ -161,10 +153,6 @@ private static void validateParameter(String parameterName) { } public CuratorDistributedLockManager(Map config) { - this(validateParameters(config), true); - } - - private CuratorDistributedLockManager(Map config, boolean ignore) { this(config.get(CONNECT_STRING_PARAM), config.get(NAMESPACE_PARAM), Integer.parseInt(config.getOrDefault(SESSION_MS_PARAM, DEFAULT_SESSION_TIMEOUT_MS)), @@ -172,6 +160,7 @@ private CuratorDistributedLockManager(Map config, boolean ignore Integer.parseInt(config.getOrDefault(CONNECTION_MS_PARAM, DEFAULT_CONNECTION_TIMEOUT_MS)), Integer.parseInt(config.getOrDefault(RETRIES_PARAM, DEFAULT_RETRIES)), Integer.parseInt(config.getOrDefault(RETRIES_MS_PARAM, DEFAULT_RETRIES_MS))); + validateParameters(config); } private CuratorDistributedLockManager(String connectString, diff --git a/artemis-lockmanager/pom.xml b/artemis-lockmanager/pom.xml index d913fcd4f67..25b0bd83429 100644 --- a/artemis-lockmanager/pom.xml +++ b/artemis-lockmanager/pom.xml @@ -32,6 +32,8 @@ artemis-lockmanager-api artemis-lockmanager-ri + artemis-kube-lock + artemis-kube-lock-all diff --git a/artemis-pom/pom.xml b/artemis-pom/pom.xml index fb774f29c1b..a94c9f30697 100644 --- a/artemis-pom/pom.xml +++ b/artemis-pom/pom.xml @@ -951,6 +951,20 @@
    + + + io.kubernetes + client-java + ${kubernetes.client.version} + + + + + io.kubernetes + client-java-extended + ${kubernetes.client.version} + + diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/lock/LockCoordinator.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/lock/LockCoordinator.java index 3850c238732..b8967d9fd11 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/lock/LockCoordinator.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/lock/LockCoordinator.java @@ -220,7 +220,7 @@ public boolean isLocked() { * @param name a descriptive name for this lock coordinator */ public LockCoordinator(ScheduledExecutorService scheduledExecutor, Executor executor, long checkPeriod, DistributedLockManager lockManager, String lockID, String name) { - super(scheduledExecutor, executor, checkPeriod, checkPeriod, TimeUnit.MILLISECONDS, false); + super(scheduledExecutor, executor, 0, checkPeriod, TimeUnit.MILLISECONDS, false); assert executor != null; this.lockManager = lockManager; this.lockID = lockID; diff --git a/artemis-server/src/main/resources/schema/artemis-configuration.xsd b/artemis-server/src/main/resources/schema/artemis-configuration.xsd index 2af3e1b1449..7163d662200 100644 --- a/artemis-server/src/main/resources/schema/artemis-configuration.xsd +++ b/artemis-server/src/main/resources/schema/artemis-configuration.xsd @@ -3266,7 +3266,7 @@ - A period used to verify if the lock still valid and renew it if needed. + A period in milliseconds used to verify if the lock still valid and renew it if needed. diff --git a/docs/user-manual/lock-coordination.adoc b/docs/user-manual/lock-coordination.adoc index 92c8679876d..e877ab1e118 100644 --- a/docs/user-manual/lock-coordination.adoc +++ b/docs/user-manual/lock-coordination.adoc @@ -202,3 +202,85 @@ This is the recommended approach for production deployments as it provides bette |1000 |Delay in milliseconds between retry attempts |=== + +=== Kubernetes-Based Lock Manager + +The Kubernetes-based lock manager uses Kubernetes Lease resources to manage distributed locks. +This implementation is designed for deployments running in Kubernetes environments and leverages the Kubernetes coordination API for leader election and distributed locking. + +The lock manager uses the Kubernetes Lease coordination mechanism, which automatically handles lease renewal and expiration. +When running inside a Kubernetes cluster, the implementation can automatically detect the namespace from the service account configuration. + +**Class name:** `org.apache.artemis.lock.kube.KubeLockManager` + +**Properties:** + +[cols="1,1,1,3"] +|=== +|Property |Required |Default |Description + +|hostname +|No +|HOSTNAME environment variable +|The hostname identifier for this broker instance. Used to identify which pod currently holds the lock. When running in Kubernetes, this is typically set automatically by the cluster. + +|namespace +|No +|Auto-detected from `/var/run/secrets/kubernetes.io/serviceaccount/namespace`, falls back to "default" +|The Kubernetes namespace where Lease resources will be created and managed. When running inside a Kubernetes cluster, this is automatically read from the service account configuration. + +|lease-timeout +|No +|30 +|The lease timeout in seconds. This determines how long a lease remains valid without renewal. If a broker crashes or loses connectivity, the lease will expire after this period, allowing another broker to acquire it. +|=== + +**Example Configuration:** + +[,xml] +---- + + + org.apache.artemis.lock.kube.KubeLockManager + artemis-ha-lock + 5000 + + + + + + + +---- + +**Kubernetes RBAC Requirements:** + +The broker's service account requires permissions to create, read, update, and delete Lease resources in the coordination.k8s.io API group. +A minimal RBAC configuration would include: + +[,yaml] +---- +apiVersion: rbac.authorization.k8s.io/v1 +kind: Role +metadata: + name: artemis-lease-manager + namespace: artemis-namespace +rules: +- apiGroups: ["coordination.k8s.io"] + resources: ["leases"] + verbs: ["get", "create", "update", "delete"] +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: RoleBinding +metadata: + name: artemis-lease-manager-binding + namespace: artemis-namespace +subjects: +- kind: ServiceAccount + name: artemis + namespace: artemis-namespace +roleRef: + kind: Role + name: artemis-lease-manager + apiGroup: rbac.authorization.k8s.io +---- diff --git a/pom.xml b/pom.xml index de17f4126f6..5dbd9bee6ce 100644 --- a/pom.xml +++ b/pom.xml @@ -130,6 +130,9 @@ 3.0.0 10.9 + + 26.0.0 + 3.8.1 diff --git a/tests/smoke-tests/pom.xml b/tests/smoke-tests/pom.xml index 2e646045aac..ddc22863c30 100644 --- a/tests/smoke-tests/pom.xml +++ b/tests/smoke-tests/pom.xml @@ -31,6 +31,7 @@ -Ddistribution.lib="${activemq.basedir}/artemis-distribution/target/apache-artemis-${project.version}-bin/apache-artemis-${project.version}/lib" localhost + false @@ -44,6 +45,18 @@ ${project.version} compile pom + + + org.apache.artemis + artemis-kube-lock-all + + + + + org.apache.artemis + artemis-kube-lock + ${project.version} + test org.apache.artemis @@ -412,7 +425,7 @@ 1 false ${skipSmokeTests} - ${sts-surefire-extra-args} ${activemq-surefire-argline} ${artemis-distribution-lib-dir} + ${sts-surefire-extra-args} ${activemq-surefire-argline} ${artemis-distribution-lib-dir} -DenableKubernetes=${enableKubernetes} @@ -451,6 +464,15 @@ + + + kubernetes-tests + + true + + + diff --git a/tests/smoke-tests/src/main/resources/servers/lockmanager/dualMirrorSingleAcceptor/ZK/A/broker.xml b/tests/smoke-tests/src/main/resources/servers/lockmanager/dualMirrorSingleAcceptor/ZK/A/broker.xml index 4d3edf8ce85..1c9e8e0c5b5 100644 --- a/tests/smoke-tests/src/main/resources/servers/lockmanager/dualMirrorSingleAcceptor/ZK/A/broker.xml +++ b/tests/smoke-tests/src/main/resources/servers/lockmanager/dualMirrorSingleAcceptor/ZK/A/broker.xml @@ -105,8 +105,8 @@ under the License. - tcp://0.0.0.0:61000?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300 - tcp://0.0.0.0:61616?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300 + tcp://0.0.0.0:61000?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300 + tcp://0.0.0.0:61616?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300 diff --git a/tests/smoke-tests/src/main/resources/servers/lockmanager/dualMirrorSingleAcceptor/ZK/B/broker.xml b/tests/smoke-tests/src/main/resources/servers/lockmanager/dualMirrorSingleAcceptor/ZK/B/broker.xml index 918c417197f..0a709f8b446 100644 --- a/tests/smoke-tests/src/main/resources/servers/lockmanager/dualMirrorSingleAcceptor/ZK/B/broker.xml +++ b/tests/smoke-tests/src/main/resources/servers/lockmanager/dualMirrorSingleAcceptor/ZK/B/broker.xml @@ -105,8 +105,8 @@ under the License. - tcp://0.0.0.0:61001?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300 - tcp://0.0.0.0:61616?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300 + tcp://0.0.0.0:61001?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300 + tcp://0.0.0.0:61616?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300 diff --git a/tests/smoke-tests/src/main/resources/servers/lockmanager/dualMirrorSingleAcceptor/file/A/broker.xml b/tests/smoke-tests/src/main/resources/servers/lockmanager/dualMirrorSingleAcceptor/file/A/broker.xml index f5f50f78360..b973b41b8fe 100644 --- a/tests/smoke-tests/src/main/resources/servers/lockmanager/dualMirrorSingleAcceptor/file/A/broker.xml +++ b/tests/smoke-tests/src/main/resources/servers/lockmanager/dualMirrorSingleAcceptor/file/A/broker.xml @@ -95,7 +95,7 @@ under the License. --> - tcp://0.0.0.0:61000?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300 + tcp://0.0.0.0:61000?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300 diff --git a/tests/smoke-tests/src/main/resources/servers/lockmanager/dualMirrorSingleAcceptor/file/B/broker.xml b/tests/smoke-tests/src/main/resources/servers/lockmanager/dualMirrorSingleAcceptor/file/B/broker.xml index 8b9b054733a..efa954f38bb 100644 --- a/tests/smoke-tests/src/main/resources/servers/lockmanager/dualMirrorSingleAcceptor/file/B/broker.xml +++ b/tests/smoke-tests/src/main/resources/servers/lockmanager/dualMirrorSingleAcceptor/file/B/broker.xml @@ -98,7 +98,7 @@ under the License. --> - tcp://0.0.0.0:61001?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300 + tcp://0.0.0.0:61001?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300 diff --git a/tests/smoke-tests/src/main/resources/servers/lockmanager/dualMirrorSingleAcceptor/kube/A/broker.xml b/tests/smoke-tests/src/main/resources/servers/lockmanager/dualMirrorSingleAcceptor/kube/A/broker.xml new file mode 100644 index 00000000000..f7a3f8fa6b7 --- /dev/null +++ b/tests/smoke-tests/src/main/resources/servers/lockmanager/dualMirrorSingleAcceptor/kube/A/broker.xml @@ -0,0 +1,208 @@ + + + + + + + + 0.0.0.0 + + true + + + NIO + + ./data/paging + + ./data/bindings + + ./data/journal + + ./data/large-messages + + true + + 2 + + -1 + + 1000 + + false + + + + + + + + + + + + + + + + + + + + + + 5000 + + + 90 + + + + + + + + + + + + tcp://0.0.0.0:61000?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300 + tcp://0.0.0.0:61616?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300 + + + + + org.apache.artemis.lock.kube.KubeLockManager + fail + 5000 + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + DLQ + ExpiryQueue + 0 + + -1 + 10 + PAGE + true + true + + + + DLQ + ExpiryQueue + 0 + + -1 + 10 + PAGE + true + true + + + DLQ + ExpiryQueue + 0 + + -1 + 1 + 10 + PAGE + true + true + + + + +
    + + + +
    +
    + + + +
    +
    + + + + + +
    +
    + +
    +
    diff --git a/tests/smoke-tests/src/main/resources/servers/lockmanager/dualMirrorSingleAcceptor/kube/B/broker.xml b/tests/smoke-tests/src/main/resources/servers/lockmanager/dualMirrorSingleAcceptor/kube/B/broker.xml new file mode 100644 index 00000000000..3f0a8bd19fb --- /dev/null +++ b/tests/smoke-tests/src/main/resources/servers/lockmanager/dualMirrorSingleAcceptor/kube/B/broker.xml @@ -0,0 +1,208 @@ + + + + + + + + 0.0.0.0 + + true + + + NIO + + ./data/paging + + ./data/bindings + + ./data/journal + + ./data/large-messages + + true + + 2 + + -1 + + 1000 + + false + + + + + + + + + + + + + + + + + + + + + + 5000 + + + 90 + + + + + + + + + + + + tcp://0.0.0.0:61001?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300 + tcp://0.0.0.0:61616?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300 + + + + + org.apache.artemis.lock.kube.KubeLockManager + fail + 5000 + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + DLQ + ExpiryQueue + 0 + + -1 + 10 + PAGE + true + true + + + + DLQ + ExpiryQueue + 0 + + -1 + 10 + PAGE + true + true + + + DLQ + ExpiryQueue + 0 + + -1 + 1 + 10 + PAGE + true + true + + + + +
    + + + +
    +
    + + + +
    +
    + + + + + +
    +
    + +
    +
    diff --git a/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/lockmanager/DualMirrorSingleAcceptorRunningTest.java b/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/lockmanager/DualMirrorSingleAcceptorRunningTest.java index 02c163a78dd..2fb169777b1 100644 --- a/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/lockmanager/DualMirrorSingleAcceptorRunningTest.java +++ b/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/lockmanager/DualMirrorSingleAcceptorRunningTest.java @@ -34,15 +34,19 @@ import org.apache.activemq.artemis.api.core.management.SimpleManagement; import org.apache.activemq.artemis.cli.commands.helper.HelperCreate; +import org.apache.activemq.artemis.json.JsonArray; +import org.apache.activemq.artemis.json.JsonObject; import org.apache.activemq.artemis.tests.smoke.common.SmokeTestBase; import org.apache.activemq.artemis.tests.util.CFUtil; import org.apache.activemq.artemis.utils.FileUtil; import org.apache.activemq.artemis.utils.Wait; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.condition.EnabledIfSystemProperty; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.fail; @@ -50,6 +54,9 @@ public class DualMirrorSingleAcceptorRunningTest extends SmokeTestBase { private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + public static final String SERVER_NAME_WITH_KUBE_A = "lockmanager/dualMirrorSingleAcceptor/kube/A"; + public static final String SERVER_NAME_WITH_KUBE_B = "lockmanager/dualMirrorSingleAcceptor/kube/B"; + public static final String SERVER_NAME_WITH_ZK_A = "lockmanager/dualMirrorSingleAcceptor/ZK/A"; public static final String SERVER_NAME_WITH_ZK_B = "lockmanager/dualMirrorSingleAcceptor/ZK/B"; @@ -109,6 +116,22 @@ public void prepareServers() throws Exception { } + @Test + @EnabledIfSystemProperty(named = "enableKubernetes", matches = "true") + public void testAlternatingKube() throws Throwable { + { + createServerPair(SERVER_NAME_WITH_KUBE_A, SERVER_NAME_WITH_KUBE_B, + "./src/main/resources/servers/lockmanager/dualMirrorSingleAcceptor/kube/A", + "./src/main/resources/servers/lockmanager/dualMirrorSingleAcceptor/kube/B", + null); + + cleanupData(SERVER_NAME_WITH_KUBE_A); + cleanupData(SERVER_NAME_WITH_KUBE_B); + } + + testAlternating(SERVER_NAME_WITH_KUBE_A, SERVER_NAME_WITH_KUBE_B, null, null); + } + @Test public void testAlternatingZK() throws Throwable { { @@ -146,18 +169,18 @@ public void testAlternatingFile() throws Throwable { Properties properties = new Properties(); - properties.put("acceptorConfigurations.artemis.extraParams.amqpCredits", "1000"); - properties.put("acceptorConfigurations.artemis.extraParams.amqpLowCredits", "300"); - properties.put("acceptorConfigurations.artemis.factoryClassName", "org.apache.activemq.artemis.core.remoting.impl.netty.NettyAcceptorFactory"); - properties.put("acceptorConfigurations.artemis.lockCoordinator", "failover"); - properties.put("acceptorConfigurations.artemis.name", "artemis"); - properties.put("acceptorConfigurations.artemis.params.scheme", "tcp"); - properties.put("acceptorConfigurations.artemis.params.tcpReceiveBufferSize", "1048576"); - properties.put("acceptorConfigurations.artemis.params.port", "61616"); - properties.put("acceptorConfigurations.artemis.params.host", "localhost"); - properties.put("acceptorConfigurations.artemis.params.protocols", "CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE"); - properties.put("acceptorConfigurations.artemis.params.useEpoll", "true"); - properties.put("acceptorConfigurations.artemis.params.tcpSendBufferSize", "1048576"); + properties.put("acceptorConfigurations.forClients.extraParams.amqpCredits", "1000"); + properties.put("acceptorConfigurations.forClients.extraParams.amqpLowCredits", "300"); + properties.put("acceptorConfigurations.forClients.factoryClassName", "org.apache.activemq.artemis.core.remoting.impl.netty.NettyAcceptorFactory"); + properties.put("acceptorConfigurations.forClients.lockCoordinator", "failover"); + properties.put("acceptorConfigurations.forClients.name", "forClients"); + properties.put("acceptorConfigurations.forClients.params.scheme", "tcp"); + properties.put("acceptorConfigurations.forClients.params.tcpReceiveBufferSize", "1048576"); + properties.put("acceptorConfigurations.forClients.params.port", "61616"); + properties.put("acceptorConfigurations.forClients.params.host", "localhost"); + properties.put("acceptorConfigurations.forClients.params.protocols", "CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE"); + properties.put("acceptorConfigurations.forClients.params.useEpoll", "true"); + properties.put("acceptorConfigurations.forClients.params.tcpSendBufferSize", "1048576"); properties.put("lockCoordinatorConfigurations.failover.checkPeriod", "5000"); properties.put("lockCoordinatorConfigurations.failover.className", "org.apache.activemq.artemis.lockmanager.file.FileBasedLockManager"); @@ -185,15 +208,20 @@ public void testAlternating(String nameServerA, String nameServerB, File brokerP processB = startServer(nameServerB, 0, -1, brokerPropertiesB); ConnectionFactory cfX = CFUtil.createConnectionFactory("amqp", "tcp://localhost:61616"); + String uriManagementA = "tcp://localhost:61000"; + String uriManagementB = "tcp://localhost:61001"; + for (int i = 0; i < ALTERNATING_TEST_ITERATIONS; i++) { logger.info("Iteration {}: Server {} active", i, (i % 2 == 0) ? "A" : "B"); if (i % 2 == 0) { // Even iteration: Server A active, kill Server B killServer(processB); + waitForLockStatus(uriManagementA, true); } else { // Odd iteration: Server B active, kill Server A killServer(processA); + waitForLockStatus(uriManagementB, true); } // Send messages through the shared acceptor @@ -205,14 +233,36 @@ public void testAlternating(String nameServerA, String nameServerB, File brokerP // Restart the killed server if (i % 2 == 0) { processB = startServer(nameServerB, 0, -1, brokerPropertiesB); + waitForLockStatus(uriManagementA, true); + waitForLockStatus(uriManagementB, false); } else { processA = startServer(nameServerA, 0, -1, brokerPropertiesA); + waitForLockStatus(uriManagementA, false); + waitForLockStatus(uriManagementB, true); } } // Verify they both have the expected message count (iterations × (sent - consumed)) - assertMessageCount("tcp://localhost:61000", "myQueue", EXPECTED_FINAL_MESSAGE_COUNT); - assertMessageCount("tcp://localhost:61001", "myQueue", EXPECTED_FINAL_MESSAGE_COUNT); + assertMessageCount(uriManagementA, "myQueue", EXPECTED_FINAL_MESSAGE_COUNT); + assertMessageCount(uriManagementB, "myQueue", EXPECTED_FINAL_MESSAGE_COUNT); + + int countActive = 0; + + if (getLockedStatus(uriManagementA).getBoolean("locked")) { + logger.info("server 0 is locked"); + countActive++; + } else { + logger.debug("server 0 is not locked"); + } + + if (getLockedStatus(uriManagementB).getBoolean("locked")) { + logger.info("server 1 is locked"); + countActive++; + } else { + logger.info("server 1 is not locked"); + } + + assertEquals(1, countActive); } private static void sendMessages(ConnectionFactory cfX, int nmessages) throws JMSException { @@ -258,15 +308,46 @@ private static void receiveMessages(ConnectionFactory cfX, int nmessages) throws } } + protected JsonObject getLockedStatus(String uri) throws Exception { + try (SimpleManagement simpleManagement = new SimpleManagement(uri, null, null)) { + return simpleManagement.listLockCoordinators().getJsonObject(0); + } + } + + protected void waitForLockStatus(String uri, boolean expectedStatus) throws Exception { + try (SimpleManagement simpleManagement = new SimpleManagement(uri, null, null)) { + Wait.assertEquals(expectedStatus, () -> { + int retry = 0; + + do { + try { + JsonArray lockList = simpleManagement.listLockCoordinators(); + return lockList.getJsonObject(0).getBoolean("locked"); + } catch (Exception e) { + logger.info(e.getMessage(), e); + } + Thread.sleep(500); + retry++; + } + while (retry < 10); + + throw new RuntimeException("could not execute lockStatus check"); + + }); + } + } + + protected void assertMessageCount(String uri, String queueName, int count) throws Exception { - SimpleManagement simpleManagement = new SimpleManagement(uri, null, null); - Wait.assertEquals(count, () -> { - try { - return simpleManagement.getMessageCountOnQueue(queueName); - } catch (Throwable e) { - return -1; - } - }); + try (SimpleManagement simpleManagement = new SimpleManagement(uri, null, null)) { + Wait.assertEquals(count, () -> { + try { + return simpleManagement.getMessageCountOnQueue(queueName); + } catch (Throwable e) { + return -1; + } + }); + } } } \ No newline at end of file diff --git a/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/lockmanager/LockCoordinatorTest.java b/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/lockmanager/LockCoordinatorTest.java index 426f4e503d1..93798e713f8 100644 --- a/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/lockmanager/LockCoordinatorTest.java +++ b/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/lockmanager/LockCoordinatorTest.java @@ -31,6 +31,7 @@ import java.util.function.Supplier; import org.apache.activemq.artemis.core.server.lock.LockCoordinator; +import org.apache.activemq.artemis.lockmanager.DistributedLock; import org.apache.activemq.artemis.lockmanager.DistributedLockManager; import org.apache.activemq.artemis.lockmanager.MutableLong; import org.apache.activemq.artemis.lockmanager.file.FileBasedLockManager; @@ -39,9 +40,11 @@ import org.apache.activemq.artemis.utils.RandomUtil; import org.apache.activemq.artemis.utils.Wait; import org.apache.activemq.artemis.utils.actors.OrderedExecutorFactory; +import org.apache.artemis.lock.kube.KubeLockManager; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.condition.EnabledIfSystemProperty; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -92,13 +95,108 @@ public void testWithFile() throws Exception { internalTest(i -> getFileCoordinators(i)); } + + @EnabledIfSystemProperty(named = "enableKubernetes", matches = "true") + @Test + public void testWithKube() throws Exception { + internalTest(i -> getKubeCoordinators(i)); + } + + public void testPassiveLockOnCoordinator(List list) throws Exception { + assertEquals(2, list.size()); + DistributedLockManager lockManager0 = list.get(0).getLockManager(); + DistributedLockManager lockManager1 = list.get(1).getLockManager(); + lockManager0.start(); + lockManager1.start(); + + runAfter(lockManager0::stop); + runAfter(lockManager1::stop); + + LockCoordinator lockCoordinator0 = new LockCoordinator(scheduledExecutor, executorFactory.getExecutor(), 10_000, lockManager0, "lock-it", "lock-it"); + lockCoordinator0.start(); + runAfter(lockCoordinator0::stop); + + Wait.assertTrue(lockCoordinator0::isLocked, 3600000, 100); + + LockCoordinator lockCoordinator1 = new LockCoordinator(scheduledExecutor, executorFactory.getExecutor(), 10_000, lockManager1, "lock-it", "lock-it"); + lockCoordinator1.start(); + runAfter(lockCoordinator1::stop); + + assertFalse(lockCoordinator1.isLocked()); + + + lockCoordinator0.stop(); + + Wait.assertTrue(lockCoordinator1::isLocked, 5000, 100); + } + + + public void testSimplePair(List list) throws Exception { + assertEquals(2, list.size()); + DistributedLockManager lockManager0 = list.get(0).getLockManager(); + DistributedLockManager lockManager1 = list.get(1).getLockManager(); + + lockManager0.start(); + lockManager1.start(); + + try { + for (int i = 0; i < 100; i++) { + DistributedLock lock0 = lockManager0.getDistributedLock("lock"); + assertTrue(lock0.tryLock()); + assertTrue(lock0.isHeldByCaller()); + DistributedLock lock1 = lockManager1.getDistributedLock("lock"); + assertFalse(lock1.tryLock()); + assertFalse(lock1.isHeldByCaller()); + + lock0.unlock(); + assertTrue(lock1.tryLock()); + assertTrue(lock1.isHeldByCaller()); + lock1.unlock(); + } + } finally { + lockManager0.stop(); + lockManager1.stop(); + } + } + + private List getKubeCoordinators(int numberOfCoordinators) { + try { + String hostPortion = "host_" + RandomUtil.randomAlphaNumericString(10); + ArrayList locks = new ArrayList<>(); + String lockName = "lock-test-" + RandomUtil.randomUUIDString(); + for (int i = 0; i < numberOfCoordinators; i++) { + HashMap parameters = new HashMap<>(); + parameters.put("hostname", hostPortion + "_host_" + i); + parameters.put("namespace", "default"); + parameters.put("lease-timeout", "10"); + DistributedLockManager lockManager = DistributedLockManager.newInstanceOf(KubeLockManager.class.getName(), parameters); + + LockCoordinator lockCoordinator = new LockCoordinator(scheduledExecutor, executorFactory.getExecutor(), KEEP_ALIVE_INTERVAL_MS, lockManager, lockName, lockName); + lockCoordinator.onLockAcquired(() -> lock(lockCoordinator)); + lockCoordinator.onLockReleased(() -> unlock(lockCoordinator)); + lockCoordinator.onLockReleased(() -> lockChanged.incrementAndGet()); + lockCoordinator.onLockAcquired(() -> lockChanged.incrementAndGet()); + lockCoordinator.setDebugInfo("ID" + i); + locks.add(lockCoordinator); + } + return locks; + } catch (Exception e) { + throw new RuntimeException(e.getMessage(), e); + } + } + @Test public void testWithZK() throws Exception { + ZookeeperCluster zkCluster = startZK(); + internalTest(i -> getZKCoordinators(i, zkCluster.getConnectString())); + } + + private ZookeeperCluster startZK() throws Exception { ZookeeperCluster zkCluster = new ZookeeperCluster(temporaryFolder, 1, ZK_BASE_PORT, 100); zkCluster.start(); runAfter(zkCluster::stop); assertEquals(ZK_ENDPOINTS, zkCluster.getConnectString()); - internalTest(i -> getZKCoordinators(i, zkCluster.getConnectString())); + return zkCluster; } private void internalTest(Function> lockCoordinatorSupplier) throws Exception { @@ -107,6 +205,8 @@ private void internalTest(Function> lockCoordinat testRetryAfterError(lockCoordinatorSupplier.apply(1).get(0)); testRetryAfterErrorWithDelayAdd(lockCoordinatorSupplier.apply(1).get(0)); testPriorityOrdering(lockCoordinatorSupplier.apply(1).get(0)); + testSimplePair(lockCoordinatorSupplier.apply(2)); + testPassiveLockOnCoordinator(getKubeCoordinators(2)); { List list = lockCoordinatorSupplier.apply(2);