Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Properties;
Expand Down Expand Up @@ -790,6 +791,11 @@ public synchronized void setCurrentVote(Vote v) {
*/
protected int quorumCnxnThreadsSize = QUORUM_CNXN_THREADS_SIZE_DEFAULT_VALUE;

private boolean quorumSaslAuthzZnodeEnabled = false;
private String quorumSaslAuthzZnodePath = QuorumAuth.QUORUM_SASL_AUTHZ_ZNODE_DEFAULT_PATH;
private final AtomicReference<Set<String>> manualSaslAuthzHosts =
new AtomicReference<>(Collections.emptySet());

public static final String QUORUM_CNXN_TIMEOUT_MS = "zookeeper.quorumCnxnTimeoutMs";
private static int quorumCnxnTimeoutMs;

Expand Down Expand Up @@ -1887,6 +1893,40 @@ private void connectNewPeers(QuorumCnxManager qcm) {
}
}

public void refreshQuorumSaslAuthzHosts(QuorumVerifier... extraQVs) {
if (!(authServer instanceof SaslQuorumAuthServer)) {
return;
}

Set<String> hosts = new HashSet<>();
synchronized (QV_LOCK) {
addHostsFromQV(quorumVerifier, hosts);
addHostsFromQV(lastSeenQuorumVerifier, hosts);
}
Set<String> manualHosts = manualSaslAuthzHosts.get();
if (manualHosts != null) {
hosts.addAll(manualHosts);
}
if (extraQVs != null) {
for (QuorumVerifier qv : extraQVs) {
addHostsFromQV(qv, hosts);
}
}

((SaslQuorumAuthServer) authServer).updateAuthorizedHosts(hosts);
}

private static void addHostsFromQV(QuorumVerifier qv, Set<String> hosts) {
if (qv == null) {
return;
}
for (QuorumServer qs : qv.getAllMembers().values()) {
if (qs != null && qs.hostname != null) {
hosts.add(qs.hostname);
}
}
}

public void setLastSeenQuorumVerifier(QuorumVerifier qv, boolean writeToDisk) {
if (!isReconfigEnabled()) {
LOG.info("Dynamic reconfig is disabled, we don't store the last seen config.");
Expand Down Expand Up @@ -1916,6 +1956,7 @@ public void setLastSeenQuorumVerifier(QuorumVerifier qv, boolean writeToDisk) {
return;
}
lastSeenQuorumVerifier = qv;
refreshQuorumSaslAuthzHosts();
if (qcm != null) {
connectNewPeers(qcm);
}
Expand Down Expand Up @@ -1975,6 +2016,7 @@ public QuorumVerifier setQuorumVerifier(QuorumVerifier qv, boolean writeToDisk)
setAddrs(qs.addr, qs.electionAddr, qs.clientAddr);
}
updateObserverMasterList();
refreshQuorumSaslAuthzHosts();
return prevQV;
}
}
Expand Down Expand Up @@ -2574,6 +2616,55 @@ void setQuorumCnxnThreadsSize(int qCnxnThreadsSize) {
LOG.info("quorum.cnxn.threads.size set to {}", quorumCnxnThreadsSize);
}

void setQuorumSaslAuthzZnodeEnabled(boolean enabled) {
quorumSaslAuthzZnodeEnabled = enabled;
}

boolean isQuorumSaslAuthzZnodeEnabled() {
return quorumSaslAuthzZnodeEnabled;
}

void setQuorumSaslAuthzZnodePath(String path) {
if (path == null) {
return;
}
quorumSaslAuthzZnodePath = path.trim();
}

String getQuorumSaslAuthzZnodePath() {
return quorumSaslAuthzZnodePath;
}

void setManualSaslAuthzHosts(String hostsCsv) {
manualSaslAuthzHosts.set(parseAuthzHosts(hostsCsv));
}

void clearManualSaslAuthzHosts() {
manualSaslAuthzHosts.set(Collections.emptySet());
}

// VisibleForTesting
Set<String> getManualSaslAuthzHosts() {
return manualSaslAuthzHosts.get();
}

private static Set<String> parseAuthzHosts(String hostsCsv) {
if (hostsCsv == null) {
return Collections.emptySet();
}
String trimmed = hostsCsv.trim();
if (trimmed.isEmpty()) {
return Collections.emptySet();
}
Set<String> hosts = new HashSet<>();
for (String token : trimmed.split("[,\\s]+")) {
if (!token.isEmpty()) {
hosts.add(token.toLowerCase(Locale.ROOT));
}
}
return Collections.unmodifiableSet(hosts);
}

boolean isQuorumSaslAuthEnabled() {
return quorumSaslEnableAuth;
}
Expand Down Expand Up @@ -2677,6 +2768,8 @@ public static QuorumPeer createFromConfig(QuorumPeerConfig config) throws IOExce
quorumPeer.setQuorumLearnerLoginContext(config.quorumLearnerLoginContext);
}
quorumPeer.setQuorumCnxnThreadsSize(config.quorumCnxnThreadsSize);
quorumPeer.setQuorumSaslAuthzZnodeEnabled(config.quorumSaslAuthzZnodeEnabled);
quorumPeer.setQuorumSaslAuthzZnodePath(config.quorumSaslAuthzZnodePath);

if (config.jvmPauseMonitorToRun) {
quorumPeer.setJvmPauseMonitor(new JvmPauseMonitor(config));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,8 @@ public class QuorumPeerConfig {
protected String quorumLearnerLoginContext = QuorumAuth.QUORUM_LEARNER_SASL_LOGIN_CONTEXT_DFAULT_VALUE;
protected String quorumServerLoginContext = QuorumAuth.QUORUM_SERVER_SASL_LOGIN_CONTEXT_DFAULT_VALUE;
protected int quorumCnxnThreadsSize;
protected boolean quorumSaslAuthzZnodeEnabled = false;
protected String quorumSaslAuthzZnodePath = "";

// multi address related configs
private boolean multiAddressEnabled = Boolean.parseBoolean(
Expand Down Expand Up @@ -362,6 +364,16 @@ public void parseProperties(Properties zkProp) throws IOException, ConfigExcepti
quorumServerLoginContext = value;
} else if (key.equals(QuorumAuth.QUORUM_KERBEROS_SERVICE_PRINCIPAL)) {
quorumServicePrincipal = value;
} else if (key.equals(QuorumAuth.QUORUM_SASL_AUTHZ_ZNODE_ENABLED)) {
quorumSaslAuthzZnodeEnabled = parseBoolean(key, value);
if (quorumSaslAuthzZnodeEnabled && quorumSaslAuthzZnodePath.trim().isEmpty()) {
quorumSaslAuthzZnodePath = QuorumAuth.QUORUM_SASL_AUTHZ_ZNODE_DEFAULT_PATH;
}
} else if (key.equals(QuorumAuth.QUORUM_SASL_AUTHZ_ZNODE_PATH)) {
quorumSaslAuthzZnodePath = value;
if (!quorumSaslAuthzZnodePath.trim().isEmpty()) {
quorumSaslAuthzZnodeEnabled = true;
}
} else if (key.equals("quorum.cnxn.threads.size")) {
quorumCnxnThreadsSize = Integer.parseInt(value);
} else if (key.equals(JvmPauseMonitor.INFO_THRESHOLD_KEY)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,22 +21,27 @@
import java.io.IOException;
import java.io.PrintWriter;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Objects;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
import org.apache.jute.Record;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.MultiOperationRecord;
import org.apache.zookeeper.Op;
import org.apache.zookeeper.ZooDefs.OpCode;
import org.apache.zookeeper.data.Stat;
import org.apache.zookeeper.metrics.MetricsContext;
import org.apache.zookeeper.proto.CreateRequest;
import org.apache.zookeeper.server.ByteBufferInputStream;
import org.apache.zookeeper.server.DataTree.ProcessTxnResult;
import org.apache.zookeeper.server.Request;
import org.apache.zookeeper.server.ServerMetrics;
import org.apache.zookeeper.server.ZKDatabase;
import org.apache.zookeeper.server.ZooKeeperServer;
import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
import org.apache.zookeeper.txn.TxnHeader;

/**
* Abstract base class for all ZooKeeperServers that participate in
Expand All @@ -54,6 +59,18 @@ protected QuorumZooKeeperServer(FileTxnSnapLog logFactory, int tickTime, int min
this.self = self;
}

@Override
public synchronized void startup() {
super.startup();
refreshAuthzHostsFromZnode();
}

@Override
public synchronized void startupWithoutServing() {
super.startupWithoutServing();
refreshAuthzHostsFromZnode();
}

@Override
protected void startSessionTracker() {
upgradeableSessionTracker = (UpgradeableSessionTracker) sessionTracker;
Expand Down Expand Up @@ -220,4 +237,111 @@ public void dumpMonitorValues(BiConsumer<String, Object> response) {
response.accept("peer_state", self.getDetailedPeerState());
}

@Override
public ProcessTxnResult processTxn(TxnHeader hdr, Record txn) {
ProcessTxnResult rc = super.processTxn(hdr, txn);
maybeRefreshAuthzHostsFromTxn(rc);
return rc;
}

@Override
public ProcessTxnResult processTxn(Request request) {
ProcessTxnResult rc = super.processTxn(request);
maybeRefreshAuthzHostsFromTxn(rc);
return rc;
}

private void maybeRefreshAuthzHostsFromTxn(ProcessTxnResult rc) {
if (rc == null) {
return;
}
if (!self.isQuorumSaslAuthEnabled()) {
return;
}
String znodePath = self.getQuorumSaslAuthzZnodePath();
if (znodePath == null || znodePath.isEmpty()) {
return;
}

if (rc.multiResult != null) {
for (ProcessTxnResult sub : rc.multiResult) {
if (isAuthzZnodeTxn(sub, znodePath)) {
if (sub.type == OpCode.delete) {
clearAuthzHostsFromZnode();
} else {
refreshAuthzHostsFromZnode();
}
return;
}
}
return;
}

if (rc.path != null && rc.path.contains("quorumAuthzHosts")) {
LOG.info("Authz znode candidate txn: type={}, path={}, expected={}", rc.type, rc.path, znodePath);
}
if (isAuthzZnodeTxn(rc, znodePath)) {
LOG.info("Authz znode txn applied: type={}, path={}", rc.type, rc.path);
if (rc.type == OpCode.delete) {
clearAuthzHostsFromZnode();
} else {
refreshAuthzHostsFromZnode();
}
}
}

private static boolean isAuthzZnodeTxn(ProcessTxnResult rc, String znodePath) {
if (rc == null || rc.path == null) {
return false;
}
if (!znodePath.equals(rc.path)) {
return false;
}
return rc.type == OpCode.create
|| rc.type == OpCode.create2
|| rc.type == OpCode.createContainer
|| rc.type == OpCode.setData
|| rc.type == OpCode.delete;
}

private void refreshAuthzHostsFromZnode() {
if (!self.isQuorumSaslAuthEnabled()) {
return;
}
String path = self.getQuorumSaslAuthzZnodePath();
if (path == null || path.isEmpty()) {
return;
}
try {
byte[] data = getZKDatabase().getDataTree().getData(path, new Stat(), null);
if (data == null) {
LOG.info("Authz znode read returned null data for {}", path);
return;
}
if (data.length == 0) {
LOG.info("Authz znode read returned empty data for {}", path);
self.clearManualSaslAuthzHosts();
} else {
LOG.info("Authz znode read {} bytes for {}", data.length, path);
self.setManualSaslAuthzHosts(new String(data, StandardCharsets.UTF_8));
}
} catch (KeeperException.NoNodeException e) {
LOG.info("Authz znode missing at {}", path);
return;
} catch (Exception e) {
LOG.warn("Failed to refresh quorum SASL authz hosts from znode {}", path, e);
return;
}

self.refreshQuorumSaslAuthzHosts();
}

private void clearAuthzHostsFromZnode() {
if (!self.isQuorumSaslAuthEnabled()) {
return;
}
self.clearManualSaslAuthzHosts();
self.refreshQuorumSaslAuthzHosts();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ public class QuorumAuth {
public static final String QUORUM_SERVER_SASL_LOGIN_CONTEXT = "quorum.auth.server.saslLoginContext";
public static final String QUORUM_SERVER_SASL_LOGIN_CONTEXT_DFAULT_VALUE = "QuorumServer";

public static final String QUORUM_SASL_AUTHZ_ZNODE_ENABLED = "quorum.auth.sasl.authzZnode.enabled";
public static final String QUORUM_SASL_AUTHZ_ZNODE_PATH = "quorum.auth.sasl.authzZnode.path";
public static final String QUORUM_SASL_AUTHZ_ZNODE_DEFAULT_PATH = "/zookeeper/quorumAuthzHosts";

static final String QUORUM_SERVER_PROTOCOL_NAME = "zookeeper-quorum";
static final String QUORUM_SERVER_SASL_DIGEST = "zk-quorum-sasl-md5";
static final String QUORUM_AUTH_MESSAGE_TAG = "qpconnect";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.io.DataOutputStream;
import java.io.IOException;
import java.net.Socket;
import java.util.Objects;
import java.util.Set;
import javax.security.auth.login.AppConfigurationEntry;
import javax.security.auth.login.Configuration;
Expand All @@ -45,6 +46,7 @@ public class SaslQuorumAuthServer implements QuorumAuthServer {
private static final int MAX_RETRIES = 5;
private final Login serverLogin;
private final boolean quorumRequireSasl;
private final SaslQuorumServerCallbackHandler saslServerCallbackHandler;

public SaslQuorumAuthServer(boolean quorumRequireSasl, String loginContext, Set<String> authzHosts) throws SaslException {
this.quorumRequireSasl = quorumRequireSasl;
Expand All @@ -55,7 +57,7 @@ public SaslQuorumAuthServer(boolean quorumRequireSasl, String loginContext, Set<
"SASL-authentication failed because the specified JAAS configuration section '%s' could not be found.",
loginContext));
}
SaslQuorumServerCallbackHandler saslServerCallbackHandler = new SaslQuorumServerCallbackHandler(
saslServerCallbackHandler = new SaslQuorumServerCallbackHandler(
Configuration.getConfiguration(), loginContext, authzHosts);
serverLogin = new Login(loginContext, saslServerCallbackHandler, new ZKConfig());
serverLogin.startThreadIfNeeded();
Expand All @@ -64,6 +66,12 @@ public SaslQuorumAuthServer(boolean quorumRequireSasl, String loginContext, Set<
}
}

public void updateAuthorizedHosts(Set<String> authzHosts) {
if (!Objects.isNull(authzHosts)) {
saslServerCallbackHandler.setAuthorizedHosts(authzHosts);
}
}

@Override
public void authenticate(Socket sock, DataInputStream din) throws SaslException {
DataOutputStream dout = null;
Expand Down
Loading
Loading