From 627f00c6c553a266df168b674088c3734de765ff Mon Sep 17 00:00:00 2001 From: Andrei Shitov Date: Thu, 25 Dec 2025 16:56:25 +0300 Subject: [PATCH] feat(ZOOKEEPER-3824): allow SASL allowlist expansion during reconfig --- .../zookeeper/server/quorum/QuorumPeer.java | 93 +++++++++++++ .../server/quorum/QuorumPeerConfig.java | 12 ++ .../server/quorum/QuorumZooKeeperServer.java | 124 ++++++++++++++++++ .../server/quorum/auth/QuorumAuth.java | 4 + .../quorum/auth/SaslQuorumAuthServer.java | 10 +- .../auth/SaslQuorumServerCallbackHandler.java | 31 ++++- .../quorum/QuorumSaslAuthzZnodeTest.java | 108 +++++++++++++++ 7 files changed, 377 insertions(+), 5 deletions(-) create mode 100644 zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/QuorumSaslAuthzZnodeTest.java diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeer.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeer.java index 404aaffcbde..eba5d03a425 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeer.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeer.java @@ -39,6 +39,7 @@ import java.util.HashSet; import java.util.LinkedList; import java.util.List; +import java.util.Locale; import java.util.Map; import java.util.Map.Entry; import java.util.Properties; @@ -790,6 +791,11 @@ public synchronized void setCurrentVote(Vote v) { */ protected int quorumCnxnThreadsSize = QUORUM_CNXN_THREADS_SIZE_DEFAULT_VALUE; + private boolean quorumSaslAuthzZnodeEnabled = false; + private String quorumSaslAuthzZnodePath = QuorumAuth.QUORUM_SASL_AUTHZ_ZNODE_DEFAULT_PATH; + private final AtomicReference> manualSaslAuthzHosts = + new AtomicReference<>(Collections.emptySet()); + public static final String QUORUM_CNXN_TIMEOUT_MS = "zookeeper.quorumCnxnTimeoutMs"; private static int quorumCnxnTimeoutMs; @@ -1887,6 +1893,40 @@ private void connectNewPeers(QuorumCnxManager qcm) { } } + public void refreshQuorumSaslAuthzHosts(QuorumVerifier... extraQVs) { + if (!(authServer instanceof SaslQuorumAuthServer)) { + return; + } + + Set hosts = new HashSet<>(); + synchronized (QV_LOCK) { + addHostsFromQV(quorumVerifier, hosts); + addHostsFromQV(lastSeenQuorumVerifier, hosts); + } + Set manualHosts = manualSaslAuthzHosts.get(); + if (manualHosts != null) { + hosts.addAll(manualHosts); + } + if (extraQVs != null) { + for (QuorumVerifier qv : extraQVs) { + addHostsFromQV(qv, hosts); + } + } + + ((SaslQuorumAuthServer) authServer).updateAuthorizedHosts(hosts); + } + + private static void addHostsFromQV(QuorumVerifier qv, Set hosts) { + if (qv == null) { + return; + } + for (QuorumServer qs : qv.getAllMembers().values()) { + if (qs != null && qs.hostname != null) { + hosts.add(qs.hostname); + } + } + } + public void setLastSeenQuorumVerifier(QuorumVerifier qv, boolean writeToDisk) { if (!isReconfigEnabled()) { LOG.info("Dynamic reconfig is disabled, we don't store the last seen config."); @@ -1916,6 +1956,7 @@ public void setLastSeenQuorumVerifier(QuorumVerifier qv, boolean writeToDisk) { return; } lastSeenQuorumVerifier = qv; + refreshQuorumSaslAuthzHosts(); if (qcm != null) { connectNewPeers(qcm); } @@ -1975,6 +2016,7 @@ public QuorumVerifier setQuorumVerifier(QuorumVerifier qv, boolean writeToDisk) setAddrs(qs.addr, qs.electionAddr, qs.clientAddr); } updateObserverMasterList(); + refreshQuorumSaslAuthzHosts(); return prevQV; } } @@ -2574,6 +2616,55 @@ void setQuorumCnxnThreadsSize(int qCnxnThreadsSize) { LOG.info("quorum.cnxn.threads.size set to {}", quorumCnxnThreadsSize); } + void setQuorumSaslAuthzZnodeEnabled(boolean enabled) { + quorumSaslAuthzZnodeEnabled = enabled; + } + + boolean isQuorumSaslAuthzZnodeEnabled() { + return quorumSaslAuthzZnodeEnabled; + } + + void setQuorumSaslAuthzZnodePath(String path) { + if (path == null) { + return; + } + quorumSaslAuthzZnodePath = path.trim(); + } + + String getQuorumSaslAuthzZnodePath() { + return quorumSaslAuthzZnodePath; + } + + void setManualSaslAuthzHosts(String hostsCsv) { + manualSaslAuthzHosts.set(parseAuthzHosts(hostsCsv)); + } + + void clearManualSaslAuthzHosts() { + manualSaslAuthzHosts.set(Collections.emptySet()); + } + + // VisibleForTesting + Set getManualSaslAuthzHosts() { + return manualSaslAuthzHosts.get(); + } + + private static Set parseAuthzHosts(String hostsCsv) { + if (hostsCsv == null) { + return Collections.emptySet(); + } + String trimmed = hostsCsv.trim(); + if (trimmed.isEmpty()) { + return Collections.emptySet(); + } + Set hosts = new HashSet<>(); + for (String token : trimmed.split("[,\\s]+")) { + if (!token.isEmpty()) { + hosts.add(token.toLowerCase(Locale.ROOT)); + } + } + return Collections.unmodifiableSet(hosts); + } + boolean isQuorumSaslAuthEnabled() { return quorumSaslEnableAuth; } @@ -2677,6 +2768,8 @@ public static QuorumPeer createFromConfig(QuorumPeerConfig config) throws IOExce quorumPeer.setQuorumLearnerLoginContext(config.quorumLearnerLoginContext); } quorumPeer.setQuorumCnxnThreadsSize(config.quorumCnxnThreadsSize); + quorumPeer.setQuorumSaslAuthzZnodeEnabled(config.quorumSaslAuthzZnodeEnabled); + quorumPeer.setQuorumSaslAuthzZnodePath(config.quorumSaslAuthzZnodePath); if (config.jvmPauseMonitorToRun) { quorumPeer.setJvmPauseMonitor(new JvmPauseMonitor(config)); diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeerConfig.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeerConfig.java index 1b37f291f51..d5fbecfb9dc 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeerConfig.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeerConfig.java @@ -121,6 +121,8 @@ public class QuorumPeerConfig { protected String quorumLearnerLoginContext = QuorumAuth.QUORUM_LEARNER_SASL_LOGIN_CONTEXT_DFAULT_VALUE; protected String quorumServerLoginContext = QuorumAuth.QUORUM_SERVER_SASL_LOGIN_CONTEXT_DFAULT_VALUE; protected int quorumCnxnThreadsSize; + protected boolean quorumSaslAuthzZnodeEnabled = false; + protected String quorumSaslAuthzZnodePath = ""; // multi address related configs private boolean multiAddressEnabled = Boolean.parseBoolean( @@ -362,6 +364,16 @@ public void parseProperties(Properties zkProp) throws IOException, ConfigExcepti quorumServerLoginContext = value; } else if (key.equals(QuorumAuth.QUORUM_KERBEROS_SERVICE_PRINCIPAL)) { quorumServicePrincipal = value; + } else if (key.equals(QuorumAuth.QUORUM_SASL_AUTHZ_ZNODE_ENABLED)) { + quorumSaslAuthzZnodeEnabled = parseBoolean(key, value); + if (quorumSaslAuthzZnodeEnabled && quorumSaslAuthzZnodePath.trim().isEmpty()) { + quorumSaslAuthzZnodePath = QuorumAuth.QUORUM_SASL_AUTHZ_ZNODE_DEFAULT_PATH; + } + } else if (key.equals(QuorumAuth.QUORUM_SASL_AUTHZ_ZNODE_PATH)) { + quorumSaslAuthzZnodePath = value; + if (!quorumSaslAuthzZnodePath.trim().isEmpty()) { + quorumSaslAuthzZnodeEnabled = true; + } } else if (key.equals("quorum.cnxn.threads.size")) { quorumCnxnThreadsSize = Integer.parseInt(value); } else if (key.equals(JvmPauseMonitor.INFO_THRESHOLD_KEY)) { diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumZooKeeperServer.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumZooKeeperServer.java index f27ce827421..e5efbd24f60 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumZooKeeperServer.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumZooKeeperServer.java @@ -21,22 +21,27 @@ import java.io.IOException; import java.io.PrintWriter; import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; import java.util.Objects; import java.util.function.BiConsumer; import java.util.stream.Collectors; +import org.apache.jute.Record; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.MultiOperationRecord; import org.apache.zookeeper.Op; import org.apache.zookeeper.ZooDefs.OpCode; +import org.apache.zookeeper.data.Stat; import org.apache.zookeeper.metrics.MetricsContext; import org.apache.zookeeper.proto.CreateRequest; import org.apache.zookeeper.server.ByteBufferInputStream; +import org.apache.zookeeper.server.DataTree.ProcessTxnResult; import org.apache.zookeeper.server.Request; import org.apache.zookeeper.server.ServerMetrics; import org.apache.zookeeper.server.ZKDatabase; import org.apache.zookeeper.server.ZooKeeperServer; import org.apache.zookeeper.server.persistence.FileTxnSnapLog; +import org.apache.zookeeper.txn.TxnHeader; /** * Abstract base class for all ZooKeeperServers that participate in @@ -54,6 +59,18 @@ protected QuorumZooKeeperServer(FileTxnSnapLog logFactory, int tickTime, int min this.self = self; } + @Override + public synchronized void startup() { + super.startup(); + refreshAuthzHostsFromZnode(); + } + + @Override + public synchronized void startupWithoutServing() { + super.startupWithoutServing(); + refreshAuthzHostsFromZnode(); + } + @Override protected void startSessionTracker() { upgradeableSessionTracker = (UpgradeableSessionTracker) sessionTracker; @@ -220,4 +237,111 @@ public void dumpMonitorValues(BiConsumer response) { response.accept("peer_state", self.getDetailedPeerState()); } + @Override + public ProcessTxnResult processTxn(TxnHeader hdr, Record txn) { + ProcessTxnResult rc = super.processTxn(hdr, txn); + maybeRefreshAuthzHostsFromTxn(rc); + return rc; + } + + @Override + public ProcessTxnResult processTxn(Request request) { + ProcessTxnResult rc = super.processTxn(request); + maybeRefreshAuthzHostsFromTxn(rc); + return rc; + } + + private void maybeRefreshAuthzHostsFromTxn(ProcessTxnResult rc) { + if (rc == null) { + return; + } + if (!self.isQuorumSaslAuthEnabled()) { + return; + } + String znodePath = self.getQuorumSaslAuthzZnodePath(); + if (znodePath == null || znodePath.isEmpty()) { + return; + } + + if (rc.multiResult != null) { + for (ProcessTxnResult sub : rc.multiResult) { + if (isAuthzZnodeTxn(sub, znodePath)) { + if (sub.type == OpCode.delete) { + clearAuthzHostsFromZnode(); + } else { + refreshAuthzHostsFromZnode(); + } + return; + } + } + return; + } + + if (rc.path != null && rc.path.contains("quorumAuthzHosts")) { + LOG.info("Authz znode candidate txn: type={}, path={}, expected={}", rc.type, rc.path, znodePath); + } + if (isAuthzZnodeTxn(rc, znodePath)) { + LOG.info("Authz znode txn applied: type={}, path={}", rc.type, rc.path); + if (rc.type == OpCode.delete) { + clearAuthzHostsFromZnode(); + } else { + refreshAuthzHostsFromZnode(); + } + } + } + + private static boolean isAuthzZnodeTxn(ProcessTxnResult rc, String znodePath) { + if (rc == null || rc.path == null) { + return false; + } + if (!znodePath.equals(rc.path)) { + return false; + } + return rc.type == OpCode.create + || rc.type == OpCode.create2 + || rc.type == OpCode.createContainer + || rc.type == OpCode.setData + || rc.type == OpCode.delete; + } + + private void refreshAuthzHostsFromZnode() { + if (!self.isQuorumSaslAuthEnabled()) { + return; + } + String path = self.getQuorumSaslAuthzZnodePath(); + if (path == null || path.isEmpty()) { + return; + } + try { + byte[] data = getZKDatabase().getDataTree().getData(path, new Stat(), null); + if (data == null) { + LOG.info("Authz znode read returned null data for {}", path); + return; + } + if (data.length == 0) { + LOG.info("Authz znode read returned empty data for {}", path); + self.clearManualSaslAuthzHosts(); + } else { + LOG.info("Authz znode read {} bytes for {}", data.length, path); + self.setManualSaslAuthzHosts(new String(data, StandardCharsets.UTF_8)); + } + } catch (KeeperException.NoNodeException e) { + LOG.info("Authz znode missing at {}", path); + return; + } catch (Exception e) { + LOG.warn("Failed to refresh quorum SASL authz hosts from znode {}", path, e); + return; + } + + self.refreshQuorumSaslAuthzHosts(); + } + + private void clearAuthzHostsFromZnode() { + if (!self.isQuorumSaslAuthEnabled()) { + return; + } + self.clearManualSaslAuthzHosts(); + self.refreshQuorumSaslAuthzHosts(); + } + } diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/auth/QuorumAuth.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/auth/QuorumAuth.java index 9e5f914747d..032eec36dc1 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/auth/QuorumAuth.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/auth/QuorumAuth.java @@ -42,6 +42,10 @@ public class QuorumAuth { public static final String QUORUM_SERVER_SASL_LOGIN_CONTEXT = "quorum.auth.server.saslLoginContext"; public static final String QUORUM_SERVER_SASL_LOGIN_CONTEXT_DFAULT_VALUE = "QuorumServer"; + public static final String QUORUM_SASL_AUTHZ_ZNODE_ENABLED = "quorum.auth.sasl.authzZnode.enabled"; + public static final String QUORUM_SASL_AUTHZ_ZNODE_PATH = "quorum.auth.sasl.authzZnode.path"; + public static final String QUORUM_SASL_AUTHZ_ZNODE_DEFAULT_PATH = "/zookeeper/quorumAuthzHosts"; + static final String QUORUM_SERVER_PROTOCOL_NAME = "zookeeper-quorum"; static final String QUORUM_SERVER_SASL_DIGEST = "zk-quorum-sasl-md5"; static final String QUORUM_AUTH_MESSAGE_TAG = "qpconnect"; diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/auth/SaslQuorumAuthServer.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/auth/SaslQuorumAuthServer.java index 9b5f48c3b88..0615f01285a 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/auth/SaslQuorumAuthServer.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/auth/SaslQuorumAuthServer.java @@ -23,6 +23,7 @@ import java.io.DataOutputStream; import java.io.IOException; import java.net.Socket; +import java.util.Objects; import java.util.Set; import javax.security.auth.login.AppConfigurationEntry; import javax.security.auth.login.Configuration; @@ -45,6 +46,7 @@ public class SaslQuorumAuthServer implements QuorumAuthServer { private static final int MAX_RETRIES = 5; private final Login serverLogin; private final boolean quorumRequireSasl; + private final SaslQuorumServerCallbackHandler saslServerCallbackHandler; public SaslQuorumAuthServer(boolean quorumRequireSasl, String loginContext, Set authzHosts) throws SaslException { this.quorumRequireSasl = quorumRequireSasl; @@ -55,7 +57,7 @@ public SaslQuorumAuthServer(boolean quorumRequireSasl, String loginContext, Set< "SASL-authentication failed because the specified JAAS configuration section '%s' could not be found.", loginContext)); } - SaslQuorumServerCallbackHandler saslServerCallbackHandler = new SaslQuorumServerCallbackHandler( + saslServerCallbackHandler = new SaslQuorumServerCallbackHandler( Configuration.getConfiguration(), loginContext, authzHosts); serverLogin = new Login(loginContext, saslServerCallbackHandler, new ZKConfig()); serverLogin.startThreadIfNeeded(); @@ -64,6 +66,12 @@ public SaslQuorumAuthServer(boolean quorumRequireSasl, String loginContext, Set< } } + public void updateAuthorizedHosts(Set authzHosts) { + if (!Objects.isNull(authzHosts)) { + saslServerCallbackHandler.setAuthorizedHosts(authzHosts); + } + } + @Override public void authenticate(Socket sock, DataInputStream din) throws SaslException { DataOutputStream dout = null; diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/auth/SaslQuorumServerCallbackHandler.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/auth/SaslQuorumServerCallbackHandler.java index b182ee13a1c..65ab5c0e776 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/auth/SaslQuorumServerCallbackHandler.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/auth/SaslQuorumServerCallbackHandler.java @@ -21,8 +21,12 @@ import java.io.IOException; import java.util.Collections; import java.util.HashMap; +import java.util.Locale; import java.util.Map; +import java.util.Objects; import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; import javax.security.auth.callback.Callback; import javax.security.auth.callback.CallbackHandler; import javax.security.auth.callback.NameCallback; @@ -50,7 +54,7 @@ public class SaslQuorumServerCallbackHandler implements CallbackHandler { private String userName; private final boolean isDigestAuthn; private final Map credentials; - private final Set authzHosts; + private final AtomicReference> authzHostsRef = new AtomicReference<>(Collections.emptySet()); public SaslQuorumServerCallbackHandler( Configuration configuration, @@ -92,8 +96,27 @@ public SaslQuorumServerCallbackHandler( this.credentials = Collections.emptyMap(); } - // authorized host lists - this.authzHosts = authzHosts; + setAuthorizedHosts(authzHosts); + } + + void setAuthorizedHosts(Set newHosts) { + if (newHosts == null || newHosts.isEmpty()) { + authzHostsRef.set(Collections.emptySet()); + return; + } + Set normalized = Collections.unmodifiableSet( + newHosts.stream() + .filter(Objects::nonNull) + .map(host -> host.toLowerCase(Locale.ROOT)) + .collect(Collectors.toSet()) + ); + authzHostsRef.set(normalized); + LOG.info("Updated quorum SASL authorized hosts: {}", normalized); + } + + // VisibleForTesting + Set getAuthorizedHostsForTest() { + return authzHostsRef.get(); } public void handle(Callback[] callbacks) throws UnsupportedCallbackException { @@ -147,7 +170,7 @@ private void handleAuthorizeCallback(AuthorizeCallback ac) { if (!isDigestAuthn && authzFlag) { String[] components = authorizationID.split("[/@]"); if (components.length == 3) { - authzFlag = authzHosts.contains(components[1]); + authzFlag = authzHostsRef.get().contains(components[1]); } else { authzFlag = false; } diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/QuorumSaslAuthzZnodeTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/QuorumSaslAuthzZnodeTest.java new file mode 100644 index 00000000000..01498a1cbc9 --- /dev/null +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/QuorumSaslAuthzZnodeTest.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.server.quorum; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.io.File; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; +import javax.security.sasl.SaslException; +import org.apache.zookeeper.ZooDefs; +import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier; +import org.apache.zookeeper.txn.CreateTxn; +import org.apache.zookeeper.txn.DeleteTxn; +import org.apache.zookeeper.txn.SetDataTxn; +import org.apache.zookeeper.txn.TxnHeader; +import org.apache.zookeeper.server.ZKDatabase; +import org.apache.zookeeper.server.persistence.FileTxnSnapLog; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +public class QuorumSaslAuthzZnodeTest { + + @TempDir + public File tmpDir; + + @Test + public void testAuthzHostsRefreshFromZnode() throws Exception { + TrackingQuorumPeer peer = new TrackingQuorumPeer(); + peer.setTickTime(2000); + peer.setMinSessionTimeout(4000); + peer.setMaxSessionTimeout(40000); + peer.setInitialConfig("server.1=localhost:2888:3888:participant"); + peer.setQuorumSaslEnabled(true); + peer.setQuorumSaslAuthzZnodeEnabled(true); + peer.setQuorumSaslAuthzZnodePath("/zookeeper/quorumAuthzHosts"); + + FileTxnSnapLog snapLog = new FileTxnSnapLog(tmpDir, tmpDir); + ZKDatabase zkDb = new ZKDatabase(snapLog); + LeaderZooKeeperServer zks = new LeaderZooKeeperServer(snapLog, peer, zkDb); + + String path = peer.getQuorumSaslAuthzZnodePath(); + + TxnHeader createHdr = new TxnHeader(1, 1, 1, 1, ZooDefs.OpCode.create); + CreateTxn createTxn = new CreateTxn(path, "HostA,hostB".getBytes(StandardCharsets.UTF_8), + ZooDefs.Ids.OPEN_ACL_UNSAFE, false, 0); + zks.processTxn(createHdr, createTxn); + + assertEquals(new HashSet(Arrays.asList("hosta", "hostb")), peer.getManualSaslAuthzHosts()); + assertEquals(1, peer.getRefreshCalls()); + + TxnHeader setHdr = new TxnHeader(1, 1, 2, 2, ZooDefs.OpCode.setData); + SetDataTxn setDataTxn = new SetDataTxn(path, "hostC".getBytes(StandardCharsets.UTF_8), -1); + zks.processTxn(setHdr, setDataTxn); + + assertEquals(new HashSet(Arrays.asList("hostc")), peer.getManualSaslAuthzHosts()); + assertEquals(2, peer.getRefreshCalls()); + + TxnHeader deleteHdr = new TxnHeader(1, 1, 3, 3, ZooDefs.OpCode.delete); + DeleteTxn deleteTxn = new DeleteTxn(path); + zks.processTxn(deleteHdr, deleteTxn); + + assertTrue(peer.getManualSaslAuthzHosts().isEmpty()); + assertEquals(3, peer.getRefreshCalls()); + } + + private static class TrackingQuorumPeer extends QuorumPeer { + private final AtomicInteger refreshCalls = new AtomicInteger(0); + + TrackingQuorumPeer() throws SaslException { + super(); + } + + @Override + public void refreshQuorumSaslAuthzHosts(QuorumVerifier... extraQVs) { + refreshCalls.incrementAndGet(); + } + + int getRefreshCalls() { + return refreshCalls.get(); + } + + @Override + Set getManualSaslAuthzHosts() { + return super.getManualSaslAuthzHosts(); + } + } +}