diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
index 10015f74837c..37607d55b0be 100644
--- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
+++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
@@ -1927,6 +1927,10 @@ public enum ConfVars {
"The pattern to extract a user name. This is effective when you use RegexPrincipalMapper. For example, if " +
"you want to extract a user name from the local part of the email claim, set this to (.*)@example.com."
),
+ CATALOG_VENDED_CREDENTIALS_PROVIDERS("metastore.catalog.vended-credentials.providers",
+ "hive.metastore.catalog.vended-credentials.providers", "",
+ "List of comma-separated credential-vending provider IDs"
+ ),
ICEBERG_CATALOG_SERVLET_PATH("metastore.iceberg.catalog.servlet.path",
"hive.metastore.iceberg.catalog.servlet.path", "iceberg",
"HMS Iceberg Catalog servlet path component of URL endpoint."
@@ -1935,6 +1939,10 @@ public enum ConfVars {
"hive.metastore.iceberg.catalog.cache.expiry", -1,
"HMS Iceberg Catalog cache expiry."
),
+ ICEBERG_CATALOG_VENDED_CREDENTIALS_ENABLED("metastore.iceberg.catalog.vended-credentials.enabled",
+ "hive.metastore.iceberg.catalog.vended-credentials.enabled", false,
+ "Boolean flag to enable credential vending on Iceberg REST Catalog"
+ ),
HTTPSERVER_THREADPOOL_MIN("hive.metastore.httpserver.threadpool.min",
"hive.metastore.httpserver.threadpool.min", 8,
"HMS embedded HTTP server minimum number of threads."
diff --git a/standalone-metastore/metastore-rest-catalog/pom.xml b/standalone-metastore/metastore-rest-catalog/pom.xml
index d987f7cce972..5d7327fbef8f 100644
--- a/standalone-metastore/metastore-rest-catalog/pom.xml
+++ b/standalone-metastore/metastore-rest-catalog/pom.xml
@@ -26,6 +26,13 @@
1.10.1
+
+ org.apache.hive
+ hive-exec
+ ${hive.version}
+ core
+ provided
+
org.apache.hive
hive-standalone-metastore-server
@@ -72,10 +79,8 @@
- org.apache.hive
- hive-exec
- ${hive.version}
- core
+ software.amazon.awssdk
+ bundle
test
@@ -99,6 +104,17 @@
tests
test
+
+ org.apache.iceberg
+ iceberg-aws
+ ${iceberg.version}
+ test
+
+
+ org.apache.hadoop
+ hadoop-aws
+ test
+
org.apache.iceberg
iceberg-core
diff --git a/standalone-metastore/metastore-rest-catalog/src/main/java/org/apache/iceberg/rest/AccessDelegationMode.java b/standalone-metastore/metastore-rest-catalog/src/main/java/org/apache/iceberg/rest/AccessDelegationMode.java
new file mode 100644
index 000000000000..711b3055aa47
--- /dev/null
+++ b/standalone-metastore/metastore-rest-catalog/src/main/java/org/apache/iceberg/rest/AccessDelegationMode.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.iceberg.rest;
+
+/**
+ * Possible values for the X-Iceberg-Access-Delegation header.
+ */
+public enum AccessDelegationMode {
+ VENDED_CREDENTIALS, REMOTE_SIGNING
+}
diff --git a/standalone-metastore/metastore-rest-catalog/src/main/java/org/apache/iceberg/rest/HMSCatalogAdapter.java b/standalone-metastore/metastore-rest-catalog/src/main/java/org/apache/iceberg/rest/HMSCatalogAdapter.java
index 73d23ae5daf0..d430ab1c249a 100644
--- a/standalone-metastore/metastore-rest-catalog/src/main/java/org/apache/iceberg/rest/HMSCatalogAdapter.java
+++ b/standalone-metastore/metastore-rest-catalog/src/main/java/org/apache/iceberg/rest/HMSCatalogAdapter.java
@@ -21,8 +21,10 @@
import com.google.common.base.Preconditions;
import java.util.Arrays;
+import java.util.EnumSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.function.Consumer;
import org.apache.iceberg.BaseTable;
import org.apache.iceberg.BaseTransaction;
@@ -72,12 +74,15 @@
import org.apache.iceberg.rest.responses.UpdateNamespacePropertiesResponse;
import org.apache.iceberg.util.Pair;
import org.apache.iceberg.util.PropertyUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Original @ RESTCatalogAdapter.java
* Adaptor class to translate REST requests into {@link Catalog} API calls.
*/
public class HMSCatalogAdapter implements RESTClient {
+ private static final Logger LOG = LoggerFactory.getLogger(HMSCatalogAdapter.class);
private static final Splitter SLASH = Splitter.on('/');
private static final Map, Integer> EXCEPTION_ERROR_CODES =
@@ -102,14 +107,15 @@ public class HMSCatalogAdapter implements RESTClient {
private final Catalog catalog;
private final SupportsNamespaces asNamespaceCatalog;
private final ViewCatalog asViewCatalog;
+ private final IcebergVendedCredentialProvider credentialProvider;
-
- public HMSCatalogAdapter(Catalog catalog) {
+ HMSCatalogAdapter(Catalog catalog, IcebergVendedCredentialProvider credentialProvider) {
Preconditions.checkArgument(catalog instanceof SupportsNamespaces);
Preconditions.checkArgument(catalog instanceof ViewCatalog);
this.catalog = catalog;
this.asNamespaceCatalog = (SupportsNamespaces) catalog;
this.asViewCatalog = (ViewCatalog) catalog;
+ this.credentialProvider = credentialProvider;
}
enum Route {
@@ -263,18 +269,21 @@ private ListTablesResponse listTables(Map vars) {
return castResponse(ListTablesResponse.class, CatalogHandlers.listTables(catalog, namespace));
}
- private LoadTableResponse createTable(Map vars, Object body) {
+ private LoadTableResponse createTable(Set accessDelegationModes, Map vars,
+ Object body) {
final Class responseType = LoadTableResponse.class;
Namespace namespace = namespaceFromPathVars(vars);
CreateTableRequest request = castRequest(CreateTableRequest.class, body);
request.validate();
+ LoadTableResponse response;
if (request.stageCreate()) {
- return castResponse(
- responseType, CatalogHandlers.stageTableCreate(catalog, namespace, request));
+ response = castResponse(
+ responseType, CatalogHandlers.stageTableCreate(catalog, namespace, request));
} else {
- return castResponse(
- responseType, CatalogHandlers.createTable(catalog, namespace, request));
+ response = castResponse(
+ responseType, CatalogHandlers.createTable(catalog, namespace, request));
}
+ return withCredentials(accessDelegationModes, TableIdentifier.of(namespace, request.name()), response);
}
private RESTResponse dropTable(Map vars) {
@@ -292,21 +301,33 @@ private RESTResponse tableExists(Map vars) {
return null;
}
- private LoadTableResponse loadTable(Map vars) {
+ private LoadTableResponse loadTable(Set delegationModes, Map vars) {
TableIdentifier ident = identFromPathVars(vars);
- return castResponse(LoadTableResponse.class, CatalogHandlers.loadTable(catalog, ident));
+ LoadTableResponse response =
+ castResponse(LoadTableResponse.class, CatalogHandlers.loadTable(catalog, ident));
+ return withCredentials(delegationModes, ident, response);
}
- private LoadTableResponse registerTable(Map vars, Object body) {
- Namespace namespace = namespaceFromPathVars(vars);
- RegisterTableRequest request = castRequest(RegisterTableRequest.class, body);
- return castResponse(LoadTableResponse.class, CatalogHandlers.registerTable(catalog, namespace, request));
+ private LoadTableResponse registerTable(
+ Set delegationModes,
+ Map vars,
+ Object body) {
+ Namespace namespace = namespaceFromPathVars(vars);
+ RegisterTableRequest request = castRequest(RegisterTableRequest.class, body);
+ LoadTableResponse response =
+ castResponse(LoadTableResponse.class, CatalogHandlers.registerTable(catalog, namespace, request));
+ return withCredentials(delegationModes, TableIdentifier.of(namespace, request.name()), response);
}
- private LoadTableResponse updateTable(Map vars, Object body) {
+ private LoadTableResponse updateTable(
+ Set delegationModes,
+ Map vars,
+ Object body) {
TableIdentifier ident = identFromPathVars(vars);
UpdateTableRequest request = castRequest(UpdateTableRequest.class, body);
- return castResponse(LoadTableResponse.class, CatalogHandlers.updateTable(catalog, ident, request));
+ LoadTableResponse response =
+ castResponse(LoadTableResponse.class, CatalogHandlers.updateTable(catalog, ident, request));
+ return withCredentials(delegationModes, ident, response);
}
private RESTResponse renameTable(Object body) {
@@ -377,6 +398,33 @@ private RESTResponse dropView(Map vars) {
return null;
}
+ private LoadTableResponse withCredentials(
+ Set accessDelegationModes,
+ TableIdentifier ident,
+ LoadTableResponse response) {
+ if (credentialProvider == null) {
+ return response;
+ }
+
+ if (accessDelegationModes.contains(AccessDelegationMode.VENDED_CREDENTIALS)) {
+ return withVendedCredentials(ident, response);
+ }
+
+ if (accessDelegationModes.contains(AccessDelegationMode.REMOTE_SIGNING)) {
+ LOG.warn("Remote signing is not supported. Ignoring...");
+ }
+
+ return response;
+ }
+
+ private LoadTableResponse withVendedCredentials(TableIdentifier ident, LoadTableResponse response) {
+ final var credentials = credentialProvider.vend(ident, response.tableMetadata().location());
+ return LoadTableResponse.builder()
+ .withTableMetadata(response.tableMetadata())
+ .addAllConfig(response.config())
+ .addAllCredentials(credentials).build();
+ }
+
/**
* This is a very simplistic approach that only validates the requirements for each table and does
* not do any other conflict detection. Therefore, it does not guarantee true transactional
@@ -408,7 +456,10 @@ private static void commitTransaction(Catalog catalog, CommitTransactionRequest
@SuppressWarnings({"MethodLength", "unchecked"})
private T handleRequest(
- Route route, Map vars, Object body) {
+ Route route,
+ Set accessDelegationModes,
+ Map vars,
+ Object body) {
switch (route) {
case CONFIG:
return (T) config();
@@ -435,7 +486,7 @@ private T handleRequest(
return (T) listTables(vars);
case CREATE_TABLE:
- return (T) createTable(vars, body);
+ return (T) createTable(accessDelegationModes, vars, body);
case DROP_TABLE:
return (T) dropTable(vars);
@@ -444,13 +495,13 @@ private T handleRequest(
return (T) tableExists(vars);
case LOAD_TABLE:
- return (T) loadTable(vars);
+ return (T) loadTable(accessDelegationModes, vars);
case REGISTER_TABLE:
- return (T) registerTable(vars, body);
+ return (T) registerTable(accessDelegationModes, vars, body);
case UPDATE_TABLE:
- return (T) updateTable(vars, body);
+ return (T) updateTable(accessDelegationModes, vars, body);
case RENAME_TABLE:
return (T) renameTable(body);
@@ -491,6 +542,7 @@ private T handleRequest(
T execute(
HTTPMethod method,
String path,
+ Set accessDelegationModes,
Map queryParams,
Object body,
Consumer errorHandler) {
@@ -503,7 +555,7 @@ T execute(
vars.putAll(queryParams);
}
vars.putAll(routeAndVars.second());
- return handleRequest(routeAndVars.first(), vars.build(), body);
+ return handleRequest(routeAndVars.first(), accessDelegationModes, vars.build(), body);
} catch (RuntimeException e) {
configureResponseFromException(e, errorBuilder);
}
@@ -519,6 +571,15 @@ T execute(
throw new RESTException("Unhandled error: %s", error);
}
+ T execute(
+ HTTPMethod method,
+ String path,
+ Map queryParams,
+ Object body,
+ Consumer errorHandler) {
+ return execute(method, path, EnumSet.noneOf(AccessDelegationMode.class), queryParams, body, errorHandler);
+ }
+
@Override
public T delete(
String path,
diff --git a/standalone-metastore/metastore-rest-catalog/src/main/java/org/apache/iceberg/rest/HMSCatalogFactory.java b/standalone-metastore/metastore-rest-catalog/src/main/java/org/apache/iceberg/rest/HMSCatalogFactory.java
index d21f239f3416..9eb029ccd5a7 100644
--- a/standalone-metastore/metastore-rest-catalog/src/main/java/org/apache/iceberg/rest/HMSCatalogFactory.java
+++ b/standalone-metastore/metastore-rest-catalog/src/main/java/org/apache/iceberg/rest/HMSCatalogFactory.java
@@ -112,7 +112,11 @@ private HttpServlet createServlet(Catalog catalog) {
// Iceberg REST client uses "catalog" by default
List scopes = Collections.singletonList("catalog");
ServletSecurity security = new ServletSecurity(AuthType.fromString(authType), configuration, req -> scopes);
- return security.proxy(new HMSCatalogServlet(new HMSCatalogAdapter(catalog)));
+ IcebergVendedCredentialProvider vendedCredentialProvider = null;
+ if (MetastoreConf.getBoolVar(configuration, ConfVars.ICEBERG_CATALOG_VENDED_CREDENTIALS_ENABLED)) {
+ vendedCredentialProvider = new IcebergVendedCredentialProvider(configuration);
+ }
+ return security.proxy(new HMSCatalogServlet(new HMSCatalogAdapter(catalog, vendedCredentialProvider)));
}
/**
diff --git a/standalone-metastore/metastore-rest-catalog/src/main/java/org/apache/iceberg/rest/HMSCatalogServlet.java b/standalone-metastore/metastore-rest-catalog/src/main/java/org/apache/iceberg/rest/HMSCatalogServlet.java
index 6140f40b2de5..b3601aef7553 100644
--- a/standalone-metastore/metastore-rest-catalog/src/main/java/org/apache/iceberg/rest/HMSCatalogServlet.java
+++ b/standalone-metastore/metastore-rest-catalog/src/main/java/org/apache/iceberg/rest/HMSCatalogServlet.java
@@ -21,8 +21,11 @@
import java.io.IOException;
import java.io.UncheckedIOException;
+import java.util.Arrays;
import java.util.Map;
+import java.util.Objects;
import java.util.Optional;
+import java.util.Set;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import javax.servlet.http.HttpServlet;
@@ -45,7 +48,7 @@ public class HMSCatalogServlet extends HttpServlet {
private static final Logger LOG = LoggerFactory.getLogger(HMSCatalogServlet.class);
private static final String CONTENT_TYPE = "Content-Type";
private static final String APPLICATION_JSON = "application/json";
-
+
private final HMSCatalogAdapter restCatalogAdapter;
private final Map responseHeaders =
ImmutableMap.of(CONTENT_TYPE, APPLICATION_JSON);
@@ -75,6 +78,7 @@ protected void service(HttpServletRequest request, HttpServletResponse response)
restCatalogAdapter.execute(
context.method(),
context.path(),
+ context.accessDelegationModes(),
context.queryParams(),
context.body(),
handle(response));
@@ -103,6 +107,7 @@ private Consumer handle(HttpServletResponse response) {
public static class ServletRequestContext {
private HTTPMethod method;
private String path;
+ private Set accessDelegationModes;
private Map queryParams;
private Object body;
@@ -115,10 +120,12 @@ private ServletRequestContext(ErrorResponse errorResponse) {
private ServletRequestContext(
HTTPMethod method,
String path,
+ Set accessDelegationModes,
Map queryParams,
Object body) {
this.method = method;
this.path = path;
+ this.accessDelegationModes = accessDelegationModes;
this.queryParams = queryParams;
this.body = body;
}
@@ -144,6 +151,21 @@ static ServletRequestContext from(HttpServletRequest request) throws IOException
.build());
}
+ var accessDelegationModes = Arrays
+ .stream(Optional.ofNullable(request.getHeader("X-Iceberg-Access-Delegation")).orElse("").split(","))
+ .map(String::trim)
+ .filter(header -> !header.isEmpty())
+ .map(header -> switch (header) {
+ case "vended-credentials" -> AccessDelegationMode.VENDED_CREDENTIALS;
+ case "remote-signing" -> AccessDelegationMode.REMOTE_SIGNING;
+ default -> {
+ LOG.warn("Unknown access delegation mode: {}", header);
+ yield null;
+ }
+ })
+ .filter(Objects::nonNull)
+ .collect(Collectors.toUnmodifiableSet());
+
Route route = routeContext.first();
Object requestBody = null;
if (route.requestClass() != null) {
@@ -155,7 +177,7 @@ static ServletRequestContext from(HttpServletRequest request) throws IOException
request.getParameterMap().entrySet().stream()
.collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue()[0]));
- return new ServletRequestContext(method, path, queryParams, requestBody);
+ return new ServletRequestContext(method, path, accessDelegationModes, queryParams, requestBody);
}
HTTPMethod method() {
@@ -166,6 +188,10 @@ public String path() {
return path;
}
+ public Set accessDelegationModes() {
+ return accessDelegationModes;
+ }
+
public Map queryParams() {
return queryParams;
}
diff --git a/standalone-metastore/metastore-rest-catalog/src/main/java/org/apache/iceberg/rest/IcebergVendedCredentialProvider.java b/standalone-metastore/metastore-rest-catalog/src/main/java/org/apache/iceberg/rest/IcebergVendedCredentialProvider.java
new file mode 100644
index 000000000000..3609ed29705c
--- /dev/null
+++ b/standalone-metastore/metastore-rest-catalog/src/main/java/org/apache/iceberg/rest/IcebergVendedCredentialProvider.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.iceberg.rest;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.credential.CompositeVendedCredentialProvider;
+import org.apache.hadoop.hive.metastore.credential.StorageAccessRequest;
+import org.apache.hadoop.hive.metastore.credential.StorageOperation;
+import org.apache.hadoop.hive.metastore.credential.VendedCredentialProvider;
+import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.metadata.HiveUtils;
+import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveAccessControlException;
+import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveAuthorizer;
+import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveAuthzContext;
+import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveAuthzPluginException;
+import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveAuthzSessionContext;
+import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveMetastoreClientFactoryImpl;
+import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveOperationType;
+import org.apache.hadoop.hive.ql.security.authorization.plugin.HivePrivilegeObject;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.rest.credentials.Credential;
+import org.apache.iceberg.rest.credentials.ImmutableCredential;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Set;
+import java.util.function.Supplier;
+
+/**
+ * This class provides vended credentials for Iceberg.
+ */
+public class IcebergVendedCredentialProvider {
+ private final String catalog;
+ private final VendedCredentialProvider vendedCredentialProvider;
+ private final Supplier authorizerSupplier;
+
+ public IcebergVendedCredentialProvider(Configuration conf) {
+ this.catalog = MetaStoreUtils.getDefaultCatalog(conf);
+ this.vendedCredentialProvider = new CompositeVendedCredentialProvider(conf);
+ this.authorizerSupplier = () -> {
+ try {
+ final var hiveConf = HiveConf.cloneConf(conf);
+ final var authorizerFactory = HiveUtils.getAuthorizerFactory(hiveConf,
+ HiveConf.ConfVars.HIVE_AUTHORIZATION_MANAGER);
+ if (authorizerFactory == null) {
+ throw new IllegalStateException("Iceberg's Credential Vending requires Hive's Authorization Manager");
+ }
+
+ final var authenticator = HiveUtils.getAuthenticator(hiveConf,
+ HiveConf.ConfVars.HIVE_METASTORE_AUTHENTICATOR_MANAGER);
+ authenticator.setConf(hiveConf);
+
+ final var authzContextBuilder = new HiveAuthzSessionContext.Builder();
+ authzContextBuilder.setClientType(HiveAuthzSessionContext.CLIENT_TYPE.HIVEMETASTORE);
+ authzContextBuilder.setSessionString("IcebergRESTCatalog");
+ return authorizerFactory.createHiveAuthorizer(
+ new HiveMetastoreClientFactoryImpl(hiveConf), hiveConf, authenticator, authzContextBuilder.build());
+ } catch (HiveException e) {
+ throw new IllegalStateException("Failed to initialize Hive authorizer for Iceberg credential vending", e);
+ }
+ };
+ }
+
+ @VisibleForTesting
+ IcebergVendedCredentialProvider(String catalog, VendedCredentialProvider vendedCredentialProvider,
+ Supplier authorizerSupplier) {
+ this.catalog = catalog;
+ this.vendedCredentialProvider = vendedCredentialProvider;
+ this.authorizerSupplier = authorizerSupplier;
+ }
+
+ /**
+ * Vends credentials for the given table identifier.
+ *
+ * @param identifier the table identifier
+ * @param location the table location
+ * @return the vended credentials
+ */
+ public List vend(TableIdentifier identifier, String location) {
+ final String username;
+ try {
+ username = UserGroupInformation.getCurrentUser().getShortUserName();
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ final var allowedOperations = resolveAllowedOperations(identifier);
+ if (allowedOperations.isEmpty()) {
+ return Collections.emptyList();
+ }
+ final var request = new StorageAccessRequest(new Path(location), allowedOperations);
+ return vendedCredentialProvider.vend(username, Collections.singletonList(request)).stream()
+ .map(credential -> ImmutableCredential.builder()
+ .prefix(credential.prefix().toString()).config(credential.credentials()).build())
+ .map(x -> (Credential) x)
+ .toList();
+ }
+
+ private Set resolveAllowedOperations(TableIdentifier identifier) {
+ Preconditions.checkArgument(identifier.namespace().levels().length == 1);
+ final var database = identifier.namespace().level(0);
+ final var table = identifier.name();
+
+ final var authorizer = authorizerSupplier.get();
+ final var allowedOperations = EnumSet.noneOf(StorageOperation.class);
+ if (isReadable(authorizer, database, table)) {
+ allowedOperations.add(StorageOperation.LIST);
+ allowedOperations.add(StorageOperation.READ);
+ }
+ if (isWritable(authorizer, database, table)) {
+ allowedOperations.add(StorageOperation.CREATE);
+ allowedOperations.add(StorageOperation.DELETE);
+ }
+
+ return allowedOperations;
+ }
+
+ // Check if the user has the SELECT permission
+ private boolean isReadable(HiveAuthorizer authorizer, String database, String table) {
+ final var object = new HivePrivilegeObject(
+ HivePrivilegeObject.HivePrivilegeObjectType.TABLE_OR_VIEW,
+ catalog,
+ database,
+ table
+ );
+ return isAllowed(authorizer, Collections.singletonList(object), Collections.emptyList());
+ }
+
+ // Check if the user has the INSERT INTO permission
+ private boolean isWritable(HiveAuthorizer authorizer, String database, String table) {
+ final var object = new HivePrivilegeObject(
+ HivePrivilegeObject.HivePrivilegeObjectType.TABLE_OR_VIEW,
+ catalog,
+ database,
+ table,
+ null,
+ null,
+ HivePrivilegeObject.HivePrivObjectActionType.INSERT,
+ null);
+ return isAllowed(authorizer, Collections.emptyList(), Collections.singletonList(object));
+ }
+
+ private boolean isAllowed(HiveAuthorizer authorizer, List input,
+ List output) {
+ var context = new HiveAuthzContext.Builder().build();
+ try {
+ authorizer.checkPrivileges(HiveOperationType.QUERY, input, output, context);
+ return true;
+ } catch (HiveAccessControlException e) {
+ return false;
+ } catch (HiveAuthzPluginException e) {
+ throw new IllegalStateException("Failed to check privileges for Iceberg credential vending", e);
+ }
+ }
+}
diff --git a/standalone-metastore/metastore-rest-catalog/src/test/java/org/apache/iceberg/rest/TestCredentialVendingAws.java b/standalone-metastore/metastore-rest-catalog/src/test/java/org/apache/iceberg/rest/TestCredentialVendingAws.java
new file mode 100644
index 000000000000..206ecd6c30c1
--- /dev/null
+++ b/standalone-metastore/metastore-rest-catalog/src/test/java/org/apache/iceberg/rest/TestCredentialVendingAws.java
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.iceberg.rest;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.UUID;
+
+import org.apache.hadoop.fs.s3a.S3AFileSystem;
+import org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider;
+import org.apache.hadoop.fs.s3a.TemporaryAWSCredentialsProvider;
+import org.apache.hadoop.hive.metastore.ServletSecurity.AuthType;
+import org.apache.hadoop.hive.metastore.annotation.MetastoreExternalTest;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars;
+import org.apache.hadoop.hive.metastore.credential.s3.S3VendedCredentialProvider;
+import org.apache.hadoop.hive.metastore.testutils.AwsS3IntegrationTestConfig;
+import org.apache.iceberg.BaseTable;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.aws.s3.S3FileIO;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.rest.extension.HiveRESTCatalogServerExtension;
+import org.apache.iceberg.rest.extension.MockHiveAuthorizer;
+import org.apache.iceberg.types.Types;
+import org.junit.experimental.categories.Category;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Assumptions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInstance;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import software.amazon.awssdk.services.s3.S3Client;
+import software.amazon.awssdk.services.s3.model.AccessDeniedException;
+import software.amazon.awssdk.services.s3.model.DeleteObjectRequest;
+import software.amazon.awssdk.services.s3.model.ListObjectsV2Request;
+
+@Category(MetastoreExternalTest.class)
+@TestInstance(TestInstance.Lifecycle.PER_CLASS)
+class TestCredentialVendingAws {
+ private static final String ACCESS_DELEGATION_HEADER = "header.X-Iceberg-Access-Delegation";
+ private static final String AWS_ACCESS_KEY_ID = "AWS_ACCESS_KEY_ID";
+ private static final String AWS_SECRET_ACCESS_KEY = "AWS_SECRET_ACCESS_KEY";
+ private static final String AWS_SESSION_TOKEN = "AWS_SESSION_TOKEN";
+
+ private static final Namespace NAMESPACE = Namespace.of("ns");
+ private static final TableIdentifier TABLE = TableIdentifier.of(NAMESPACE, "test");
+ private static final Schema SCHEMA = new Schema(Types.NestedField.required(1, "id", Types.LongType.get()));
+
+ @RegisterExtension
+ private static final HiveRESTCatalogServerExtension REST_CATALOG_EXTENSION = newServerExtension();
+
+ private AwsS3IntegrationTestConfig config;
+ private RESTCatalog adminCatalog;
+ private S3Client adminS3;
+ private String currentTableRoot;
+ private String tableLocation;
+ private String metadataLocation;
+
+ private static HiveRESTCatalogServerExtension newServerExtension() {
+ var builder = HiveRESTCatalogServerExtension.builder(AuthType.SIMPLE);
+ if (!AwsS3IntegrationTestConfig.isConfigured()) {
+ return builder.build();
+ }
+
+ var config = AwsS3IntegrationTestConfig.fromEnvironment();
+
+ builder.configure(ConfVars.ICEBERG_CATALOG_VENDED_CREDENTIALS_ENABLED.getVarname(), "true");
+ builder.configure(ConfVars.CATALOG_VENDED_CREDENTIALS_PROVIDERS.getVarname(), "my-s3");
+ builder.configure(ConfVars.CATALOG_VENDED_CREDENTIALS_PROVIDERS.getVarname() + ".my-s3.class",
+ S3VendedCredentialProvider.class.getName());
+ builder.configure(ConfVars.CATALOG_VENDED_CREDENTIALS_PROVIDERS.getVarname() + ".my-s3.aws.role-arn",
+ config.roleArn());
+ builder.configure(ConfVars.CATALOG_VENDED_CREDENTIALS_PROVIDERS.getVarname() + ".my-s3.aws.prefixes",
+ "%s/%s/".formatted(config.bucket(), config.basePath()));
+ builder.configure(ConfVars.CATALOG_VENDED_CREDENTIALS_PROVIDERS.getVarname() + ".my-s3.aws.region",
+ config.regionId());
+ if (config.externalId() != null && !config.externalId().isBlank()) {
+ builder.configure(ConfVars.CATALOG_VENDED_CREDENTIALS_PROVIDERS.getVarname() + ".my-s3.aws.external-id",
+ config.externalId());
+ }
+ builder.configure("fs.s3a.impl", S3AFileSystem.class.getName());
+ builder.configure("fs.AbstractFileSystem.s3a.impl", "org.apache.hadoop.fs.s3a.S3A");
+ builder.configure("fs.s3a.endpoint.region", config.regionId());
+ configureS3aCredentials(builder);
+
+ return builder.build();
+ }
+
+ private static void configureS3aCredentials(HiveRESTCatalogServerExtension.Builder builder) {
+ var accessKey = System.getenv(AWS_ACCESS_KEY_ID);
+ var secretKey = System.getenv(AWS_SECRET_ACCESS_KEY);
+ var sessionToken = System.getenv(AWS_SESSION_TOKEN);
+
+ if (accessKey == null || accessKey.isBlank() || secretKey == null || secretKey.isBlank()) {
+ return;
+ }
+
+ builder.configure("fs.s3a.access.key", accessKey);
+ builder.configure("fs.s3a.secret.key", secretKey);
+ if (sessionToken != null && !sessionToken.isBlank()) {
+ builder.configure("fs.s3a.session.token", sessionToken);
+ builder.configure("fs.s3a.aws.credentials.provider", TemporaryAWSCredentialsProvider.class.getName());
+ } else {
+ builder.configure("fs.s3a.aws.credentials.provider", SimpleAWSCredentialsProvider.class.getName());
+ }
+ }
+
+ private RESTCatalog newCatalog(String user, boolean requestVendedCredentials) {
+ var properties = new HashMap();
+ properties.put("uri", REST_CATALOG_EXTENSION.getRestEndpoint());
+ properties.put("header.x-actor-username", user);
+ properties.put("io-impl", S3FileIO.class.getName());
+ properties.put("client.region", config.regionId());
+ if (requestVendedCredentials) {
+ properties.put(ACCESS_DELEGATION_HEADER, "vended-credentials");
+ }
+ return RCKUtils.initCatalogClient(properties);
+ }
+
+ private void deletePrefix(String prefix) {
+ String continuationToken = null;
+ boolean truncated;
+ do {
+ var response = adminS3.listObjectsV2(ListObjectsV2Request.builder()
+ .bucket(config.bucket())
+ .prefix(prefix)
+ .continuationToken(continuationToken)
+ .build());
+ response.contents().forEach(object -> adminS3.deleteObject(
+ DeleteObjectRequest.builder().bucket(config.bucket()).key(object.key()).build()));
+ continuationToken = response.nextContinuationToken();
+ truncated = Boolean.TRUE.equals(response.isTruncated());
+ } while (truncated);
+ }
+
+ @BeforeAll
+ void setupAll() {
+ Assumptions.assumeTrue(
+ AwsS3IntegrationTestConfig.isConfigured(),
+ "Set HIVE_IT_AWS_INTEGRATION_TEST_ENABLED=true and configure S3 integration environment variables");
+
+ config = AwsS3IntegrationTestConfig.fromEnvironment();
+ adminS3 = S3Client.builder().region(config.region()).build();
+ adminCatalog = newCatalog("admin", false);
+
+ Assertions.assertEquals(
+ Collections.singletonList(Namespace.of("default")),
+ adminCatalog.listNamespaces());
+ }
+
+ @BeforeEach
+ void setup() {
+ RCKUtils.purgeCatalogTestEntries(adminCatalog);
+ adminCatalog.createNamespace(NAMESPACE);
+ currentTableRoot = "%s/%s".formatted(config.basePath(), UUID.randomUUID());
+ tableLocation = "s3a://%s/%s/table".formatted(config.bucket(), currentTableRoot);
+ var table = adminCatalog.buildTable(TABLE, SCHEMA).withLocation(tableLocation).create();
+ metadataLocation = ((BaseTable) table).operations().refresh().metadataFileLocation();
+ }
+
+ @AfterEach
+ void teardown() {
+ if (adminCatalog != null) {
+ RCKUtils.purgeCatalogTestEntries(adminCatalog);
+ }
+ if (adminS3 != null && currentTableRoot != null) {
+ deletePrefix(currentTableRoot);
+ }
+ currentTableRoot = null;
+ tableLocation = null;
+ metadataLocation = null;
+ }
+
+ @AfterAll
+ void teardownAll() throws Exception {
+ if (adminCatalog != null) {
+ adminCatalog.close();
+ }
+ if (adminS3 != null) {
+ adminS3.close();
+ }
+ }
+
+ @Test
+ void testWritableUser() throws IOException {
+ try (var sessionCatalog = newCatalog("USER_1", true)) {
+ var table = sessionCatalog.loadTable(TABLE);
+
+ var metadataFile = table.io().newInputFile(metadataLocation);
+ Assertions.assertTrue(metadataFile.exists());
+ try (var input = metadataFile.newStream()) {
+ Assertions.assertTrue(new String(input.readAllBytes(), StandardCharsets.UTF_8).contains(tableLocation));
+ }
+
+ var destination = tableLocation + "/credential-vending-it.txt";
+ try (var output = table.io().newOutputFile(destination).createOrOverwrite()) {
+ output.write("content".getBytes(StandardCharsets.UTF_8));
+ }
+ }
+ }
+
+ @Test
+ void testReadOnlyUser() throws IOException {
+ try (var sessionCatalog = newCatalog(MockHiveAuthorizer.READ_ONLY_USER, true)) {
+ var table = sessionCatalog.loadTable(TABLE);
+
+ var metadataFile = table.io().newInputFile(metadataLocation);
+ Assertions.assertTrue(metadataFile.exists());
+ try (var input = metadataFile.newStream()) {
+ Assertions.assertTrue(new String(input.readAllBytes(), StandardCharsets.UTF_8).contains(tableLocation));
+ }
+
+ var destination = tableLocation + "/credential-vending-it.txt";
+ var output = table.io().newOutputFile(destination).createOrOverwrite();
+ Assertions.assertThrows(AccessDeniedException.class, output::close);
+ }
+ }
+}
diff --git a/standalone-metastore/metastore-rest-catalog/src/test/java/org/apache/iceberg/rest/TestIcebergVendedCredentialProvider.java b/standalone-metastore/metastore-rest-catalog/src/test/java/org/apache/iceberg/rest/TestIcebergVendedCredentialProvider.java
new file mode 100644
index 000000000000..bd7b91c23239
--- /dev/null
+++ b/standalone-metastore/metastore-rest-catalog/src/test/java/org/apache/iceberg/rest/TestIcebergVendedCredentialProvider.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.iceberg.rest;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.metastore.annotation.MetastoreUnitTest;
+import org.apache.hadoop.hive.metastore.credential.StorageAccessRequest;
+import org.apache.hadoop.hive.metastore.credential.StorageOperation;
+import org.apache.hadoop.hive.metastore.credential.VendedCredentialProvider;
+import org.apache.hadoop.hive.metastore.credential.VendedStorageCredential;
+import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveAccessControlException;
+import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveAuthorizer;
+import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveAuthzContext;
+import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveAuthzPluginException;
+import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveOperationType;
+import org.apache.hadoop.hive.ql.security.authorization.plugin.HivePrivilegeObject;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.rest.credentials.Credential;
+import org.apache.iceberg.rest.credentials.ImmutableCredential;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mockito;
+
+import java.security.PrivilegedAction;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Map;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+
+@Category(MetastoreUnitTest.class)
+public class TestIcebergVendedCredentialProvider {
+ private static final String CATALOG = "catalog";
+ private static final String DATABASE = "database";
+ private static final String TABLE = "tbl";
+ private static final HivePrivilegeObject INPUT_OBJECT = new HivePrivilegeObject(
+ HivePrivilegeObject.HivePrivilegeObjectType.TABLE_OR_VIEW, CATALOG, DATABASE, TABLE
+ );
+ private static final HivePrivilegeObject OUTPUT_OBJECT = new HivePrivilegeObject(
+ HivePrivilegeObject.HivePrivilegeObjectType.TABLE_OR_VIEW, CATALOG, DATABASE, TABLE,
+ null,
+ null,
+ HivePrivilegeObject.HivePrivObjectActionType.INSERT,
+ null
+ );
+
+ private static void assertPrivilegeObjects(List expected, List> actual) {
+ Assert.assertEquals(expected.size(), actual.size());
+ var actualObjects = new ArrayList(actual.size());
+ actual.forEach(object -> actualObjects.add((HivePrivilegeObject) object));
+ for (int index = 0; index < expected.size(); index++) {
+ assertPrivilegeObject(expected.get(index), actualObjects.get(index));
+ }
+ }
+
+ private static void assertPrivilegeObject(HivePrivilegeObject expected, HivePrivilegeObject actual) {
+ Assert.assertEquals(expected.getType(), actual.getType());
+ Assert.assertEquals(expected.getCatName(), actual.getCatName());
+ Assert.assertEquals(expected.getDbname(), actual.getDbname());
+ Assert.assertEquals(expected.getObjectName(), actual.getObjectName());
+ Assert.assertEquals(expected.getActionType(), actual.getActionType());
+ }
+
+ @Test
+ @SuppressWarnings("unchecked")
+ public void testVendWithWritableUser() throws HiveAccessControlException, HiveAuthzPluginException {
+ var authorizer = Mockito.mock(HiveAuthorizer.class);
+ var delegate = Mockito.mock(VendedCredentialProvider.class);
+ var username = "writable";
+ var operations = EnumSet.of(StorageOperation.LIST, StorageOperation.READ, StorageOperation.CREATE,
+ StorageOperation.DELETE);
+ var path = new Path("s3a://bucket/path");
+ var requests = List.of(new StorageAccessRequest(path, operations));
+ var credential = List.of(new VendedStorageCredential(path, Map.of("key", "k1"), Instant.MAX));
+ Mockito.when(delegate.vend(username, requests)).thenReturn(credential);
+ var provider = new IcebergVendedCredentialProvider(CATALOG, delegate, () -> authorizer);
+ var result = UserGroupInformation.createRemoteUser(username).doAs((PrivilegedAction>) () ->
+ provider.vend(TableIdentifier.of(DATABASE, TABLE), path.toString()));
+ var expected = ImmutableCredential.builder().prefix(path.toString()).config(Map.of("key", "k1")).build();
+ Assert.assertEquals(List.of(expected), result);
+
+ var inputCaptor = ArgumentCaptor.forClass(List.class);
+ var outputCaptor = ArgumentCaptor.forClass(List.class);
+ Mockito.verify(authorizer, Mockito.times(2)).checkPrivileges(
+ eq(HiveOperationType.QUERY),
+ inputCaptor.capture(),
+ outputCaptor.capture(),
+ any(HiveAuthzContext.class)
+ );
+ assertPrivilegeObjects(List.of(INPUT_OBJECT), inputCaptor.getAllValues().getFirst());
+ assertPrivilegeObjects(List.of(), inputCaptor.getAllValues().getLast());
+ assertPrivilegeObjects(List.of(), outputCaptor.getAllValues().getFirst());
+ assertPrivilegeObjects(List.of(OUTPUT_OBJECT), outputCaptor.getAllValues().getLast());
+ Mockito.verifyNoMoreInteractions(authorizer);
+ Mockito.verify(delegate).vend(username, requests);
+ Mockito.verifyNoMoreInteractions(delegate);
+ }
+
+ @Test
+ @SuppressWarnings("unchecked")
+ public void testVendWithReadOnlyUser() throws HiveAccessControlException, HiveAuthzPluginException {
+ var authorizer = Mockito.mock(HiveAuthorizer.class);
+ var delegate = Mockito.mock(VendedCredentialProvider.class);
+ var username = "readonly";
+ var operations = EnumSet.of(StorageOperation.LIST, StorageOperation.READ);
+ var path = new Path("s3a://bucket/path");
+ var requests = List.of(new StorageAccessRequest(path, operations));
+ var credential = List.of(new VendedStorageCredential(path, Map.of("key", "k1"), Instant.MAX));
+ Mockito.when(delegate.vend(username, requests)).thenReturn(credential);
+ Mockito.doAnswer(invocation -> {
+ if (!((List>) invocation.getArgument(2)).isEmpty()) {
+ throw new HiveAccessControlException("write denied");
+ }
+ return null;
+ }).when(authorizer).checkPrivileges(any(), any(), any(), any());
+
+ var provider = new IcebergVendedCredentialProvider(CATALOG, delegate, () -> authorizer);
+ var result = UserGroupInformation.createRemoteUser(username).doAs((PrivilegedAction>) () ->
+ provider.vend(TableIdentifier.of(DATABASE, TABLE), path.toString()));
+ var expected = ImmutableCredential.builder().prefix(path.toString()).config(Map.of("key", "k1")).build();
+ Assert.assertEquals(List.of(expected), result);
+
+ var operationCaptor = ArgumentCaptor.forClass(HiveOperationType.class);
+ var inputCaptor = ArgumentCaptor.forClass(List.class);
+ var outputCaptor = ArgumentCaptor.forClass(List.class);
+ Mockito.verify(authorizer, Mockito.times(2)).checkPrivileges(
+ operationCaptor.capture(),
+ inputCaptor.capture(),
+ outputCaptor.capture(),
+ any(HiveAuthzContext.class)
+ );
+ Assert.assertEquals(List.of(HiveOperationType.QUERY, HiveOperationType.QUERY), operationCaptor.getAllValues());
+ assertPrivilegeObjects(List.of(INPUT_OBJECT), inputCaptor.getAllValues().getFirst());
+ assertPrivilegeObjects(List.of(), inputCaptor.getAllValues().getLast());
+ assertPrivilegeObjects(List.of(), outputCaptor.getAllValues().getFirst());
+ assertPrivilegeObjects(List.of(OUTPUT_OBJECT), outputCaptor.getAllValues().getLast());
+ Mockito.verifyNoMoreInteractions(authorizer);
+ Mockito.verify(delegate).vend(username, requests);
+ Mockito.verifyNoMoreInteractions(delegate);
+ }
+
+ @Test
+ public void testVendWithoutPrivileges() throws HiveAccessControlException, HiveAuthzPluginException {
+ var authorizer = Mockito.mock(HiveAuthorizer.class);
+ var delegate = Mockito.mock(VendedCredentialProvider.class);
+ var username = "denied";
+ Mockito.doThrow(new HiveAccessControlException("denied"))
+ .when(authorizer).checkPrivileges(any(), any(), any(), any());
+
+ var provider = new IcebergVendedCredentialProvider(CATALOG, delegate, () -> authorizer);
+ var result = UserGroupInformation.createRemoteUser(username).doAs((PrivilegedAction>) () ->
+ provider.vend(TableIdentifier.of(DATABASE, TABLE), "s3a://bucket/path"));
+
+ Assert.assertEquals(List.of(), result);
+ Mockito.verify(authorizer, Mockito.times(2)).checkPrivileges(any(), any(), any(), any());
+ Mockito.verifyNoMoreInteractions(authorizer);
+ Mockito.verifyNoInteractions(delegate);
+ }
+}
diff --git a/standalone-metastore/metastore-rest-catalog/src/test/java/org/apache/iceberg/rest/extension/MockHiveAuthorizer.java b/standalone-metastore/metastore-rest-catalog/src/test/java/org/apache/iceberg/rest/extension/MockHiveAuthorizer.java
index 4dd2600d3a6f..49b1404f5b0d 100644
--- a/standalone-metastore/metastore-rest-catalog/src/test/java/org/apache/iceberg/rest/extension/MockHiveAuthorizer.java
+++ b/standalone-metastore/metastore-rest-catalog/src/test/java/org/apache/iceberg/rest/extension/MockHiveAuthorizer.java
@@ -18,7 +18,9 @@
package org.apache.iceberg.rest.extension;
+import java.util.ArrayList;
import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.security.HiveAuthenticationProvider;
import org.apache.hadoop.hive.ql.security.authorization.plugin.AbstractHiveAuthorizer;
@@ -35,7 +37,9 @@
public class MockHiveAuthorizer extends AbstractHiveAuthorizer {
public static final String PERMISSION_TEST_USER = "permission_test_user";
+ public static final String READ_ONLY_USER = "read_only_user";
private static final Logger LOG = LoggerFactory.getLogger(MockHiveAuthorizer.class);
+ private static final List PRIVILEGE_CHECKS = new CopyOnWriteArrayList<>();
private final HiveAuthenticationProvider authenticator;
@@ -43,6 +47,22 @@ public MockHiveAuthorizer(HiveAuthenticationProvider authenticator) {
this.authenticator = authenticator;
}
+ public record PrivilegeCheck(HiveOperationType operationType, List inputs,
+ List outputs) {
+ }
+
+ public static void clearPrivilegeChecks() {
+ PRIVILEGE_CHECKS.clear();
+ }
+
+ public static List privilegeChecks() {
+ return new ArrayList<>(PRIVILEGE_CHECKS);
+ }
+
+ private static List copyPrivilegeObjects(List objects) {
+ return objects == null ? List.of() : List.copyOf(objects);
+ }
+
@Override
public VERSION getVersion() {
return null;
@@ -97,9 +117,15 @@ public void checkPrivileges(HiveOperationType hiveOpType, List outputHObjs, HiveAuthzContext context) throws HiveAccessControlException {
LOG.info("Checking privileges. User={}, Operation={}, inputs={}, outputs={}", authenticator.getUserName(),
hiveOpType, inputsHObjs, outputHObjs);
+ PRIVILEGE_CHECKS.add(new PrivilegeCheck(hiveOpType, copyPrivilegeObjects(inputsHObjs),
+ copyPrivilegeObjects(outputHObjs)));
if (PERMISSION_TEST_USER.equals(authenticator.getUserName())) {
- throw new HiveAccessControlException(String.format("Unauthorized. Operation=%s, inputs=%s, outputs=%s",
- hiveOpType, inputsHObjs, outputHObjs));
+ throw new HiveAccessControlException(String.format("Unauthorized. User=%s, Operation=%s, inputs=%s, outputs=%s",
+ authenticator.getUserName(), hiveOpType, inputsHObjs, outputHObjs));
+ }
+ if (READ_ONLY_USER.equals(authenticator.getUserName()) && !outputHObjs.isEmpty()) {
+ throw new HiveAccessControlException(String.format("Unauthorized. User=%s, Operation=%s, inputs=%s, outputs=%s",
+ authenticator.getUserName(), hiveOpType, inputsHObjs, outputHObjs));
}
}
diff --git a/standalone-metastore/metastore-server/pom.xml b/standalone-metastore/metastore-server/pom.xml
index 64926eaba1c9..4daed1576556 100644
--- a/standalone-metastore/metastore-server/pom.xml
+++ b/standalone-metastore/metastore-server/pom.xml
@@ -379,7 +379,32 @@
org.springframework
spring-core
+
+ software.amazon.awssdk
+ arns
+ provided
+
+
+ software.amazon.awssdk
+ iam-policy-builder
+ provided
+
+
+ software.amazon.awssdk
+ sts
+ provided
+
+
+ software.amazon.awssdk
+ url-connection-client
+ provided
+
+
+ software.amazon.awssdk
+ bundle
+ test
+
junit
junit
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/credential/CachedVendedCredentialProvider.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/credential/CachedVendedCredentialProvider.java
new file mode 100644
index 000000000000..c04a380e9e85
--- /dev/null
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/credential/CachedVendedCredentialProvider.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.metastore.credential;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.github.benmanes.caffeine.cache.Expiry;
+import org.checkerframework.checker.index.qual.NonNegative;
+import org.jetbrains.annotations.NotNull;
+
+import java.time.Clock;
+import java.time.Duration;
+import java.util.List;
+
+/**
+ * A VendedCredentialProvider that caches the results of the delegated provider.
+ */
+public class CachedVendedCredentialProvider implements VendedCredentialProvider {
+ private record CacheKey(String username, List accessRequests) {}
+
+ private final VendedCredentialProvider delegate;
+ private final Cache> cache;
+
+ public CachedVendedCredentialProvider(VendedCredentialProvider delegate, long maxSize, Duration maxCacheDuration,
+ Clock clock) {
+ this.delegate = delegate;
+ this.cache = Caffeine.newBuilder().maximumSize(maxSize).expireAfter(
+ new Expiry>() {
+ private long calculateExpiration(List credentials) {
+ var now = clock.instant();
+ // Choose the minimal one / 2 in case there is clock-skew
+ var expiredIn = credentials.stream().map(VendedStorageCredential::expiredAt)
+ .map(expiredAt -> Duration.between(now, expiredAt).dividedBy(2)).min(Duration::compareTo);
+ return expiredIn.map(duration -> Math.min(duration.toNanos(), maxCacheDuration.toNanos()))
+ .orElseGet(maxCacheDuration::toNanos);
+ }
+
+ @Override
+ public long expireAfterCreate(@NotNull CachedVendedCredentialProvider.CacheKey key,
+ @NotNull List value, long currentTime) {
+ return calculateExpiration(value);
+ }
+
+ @Override
+ public long expireAfterUpdate(@NotNull CachedVendedCredentialProvider.CacheKey key,
+ @NotNull List value, long currentTime, @NonNegative long currentDuration) {
+ return calculateExpiration(value);
+ }
+
+ @Override
+ public long expireAfterRead(@NotNull CachedVendedCredentialProvider.CacheKey key,
+ @NotNull List value, long currentTime, @NonNegative long currentDuration) {
+ return currentDuration;
+ }
+ }).build();
+ }
+
+ @Override
+ public boolean supports(StorageAccessRequest request) {
+ return delegate.supports(request);
+ }
+
+ @Override
+ public List vend(String username, List accessRequests) {
+ return cache.get(new CacheKey(username, accessRequests), k -> delegate.vend(username, accessRequests));
+ }
+}
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/credential/CompositeVendedCredentialProvider.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/credential/CompositeVendedCredentialProvider.java
new file mode 100644
index 000000000000..91285f1df457
--- /dev/null
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/credential/CompositeVendedCredentialProvider.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.metastore.credential;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.InvocationTargetException;
+import java.time.Clock;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+public class CompositeVendedCredentialProvider implements VendedCredentialProvider {
+ private static final class FallbackVendedCredentialProvider implements VendedCredentialProvider {
+ @Override
+ public boolean supports(StorageAccessRequest request) {
+ return true;
+ }
+
+ @Override
+ public List vend(String username, List accessRequests) {
+ return List.of();
+ }
+ }
+
+ private static final Logger LOG = LoggerFactory.getLogger(CompositeVendedCredentialProvider.class);
+ private static final String PROVIDERS_KEY_PREFIX =
+ MetastoreConf.ConfVars.CATALOG_VENDED_CREDENTIALS_PROVIDERS.getVarname();
+ private static final String CLASS_KEY = "class";
+ private static final String CACHE_MAX_SIZE_KEY = "cache.max-size";
+ private static final String CACHE_MAX_DURATION_KEY = "cache.max-duration";
+ private static final Duration DEFAULT_MAX_CACHE_DURATION = Duration.ofMinutes(30);
+ private static final VendedCredentialProvider FALLBACK_PROVIDER = new FallbackVendedCredentialProvider();
+
+ private final List providers;
+
+ private static VendedCredentialProvider create(Configuration conf, String providerId) {
+ final var providerConfigKeyPrefix = "%s.%s.".formatted(PROVIDERS_KEY_PREFIX, providerId);
+ final var classKey = providerConfigKeyPrefix + CLASS_KEY;
+ final var clazz = conf.getClass(classKey, null, VendedCredentialProvider.class);
+ if (clazz == null) {
+ throw new IllegalArgumentException(
+ "No vended credential provider class configured for provider ID: " + providerId);
+ }
+
+ final VendedCredentialProvider provider;
+ try {
+ final var constructor = clazz.getDeclaredConstructor(String.class, Configuration.class);
+ provider = constructor.newInstance(providerConfigKeyPrefix, conf);
+ } catch (NoSuchMethodException | InstantiationException | IllegalAccessException | InvocationTargetException e) {
+ throw new IllegalArgumentException("Failed to instantiate vended credential provider: " + clazz.getName(), e);
+ }
+
+ final var maxCacheSize = conf.getInt(providerConfigKeyPrefix + CACHE_MAX_SIZE_KEY, 0);
+ if (maxCacheSize <= 0) {
+ LOG.info("Created VendedCredentialProvider, {}, without cache", provider);
+ return provider;
+ }
+
+ final var maxCacheDuration = Duration.ofNanos(
+ conf.getTimeDuration(providerConfigKeyPrefix + CACHE_MAX_DURATION_KEY,
+ DEFAULT_MAX_CACHE_DURATION.toNanos(), TimeUnit.NANOSECONDS));
+ LOG.info("Created VendedCredentialProvider, {}, with caching (capacity={}, duration={}) ", provider, maxCacheSize,
+ maxCacheDuration);
+ return new CachedVendedCredentialProvider(provider, maxCacheSize, maxCacheDuration, Clock.systemUTC());
+ }
+
+ public CompositeVendedCredentialProvider(Configuration conf) {
+ this(
+ Arrays
+ .stream(
+ MetastoreConf.getTrimmedStringsVar(conf, MetastoreConf.ConfVars.CATALOG_VENDED_CREDENTIALS_PROVIDERS))
+ .filter(providerId -> !providerId.isEmpty())
+ .map(providerId -> create(conf, providerId))
+ .toList()
+ );
+ }
+
+ @VisibleForTesting
+ CompositeVendedCredentialProvider(List providers) {
+ this.providers = providers;
+ }
+
+ @Override
+ public boolean supports(StorageAccessRequest request) {
+ return true;
+ }
+
+ private VendedCredentialProvider providerFor(StorageAccessRequest request) {
+ return providers.stream()
+ .filter(provider -> provider.supports(request))
+ .findFirst()
+ .orElse(FALLBACK_PROVIDER);
+ }
+
+ @Override
+ public List vend(String username, List accessRequests) {
+ final var requestsByProvider = accessRequests.stream()
+ .collect(Collectors.groupingBy(this::providerFor, LinkedHashMap::new, Collectors.toList()));
+ return requestsByProvider.entrySet().stream()
+ .flatMap(entry -> entry.getKey().vend(username, entry.getValue()).stream())
+ .toList();
+ }
+}
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/credential/StorageAccessRequest.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/credential/StorageAccessRequest.java
new file mode 100644
index 000000000000..2605f115af29
--- /dev/null
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/credential/StorageAccessRequest.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.metastore.credential;
+
+import org.apache.hadoop.fs.Path;
+
+import java.util.Objects;
+import java.util.Set;
+
+/**
+ * An object containing requested access to the given path.
+ *
+ * @param location a file or directory path. It must be an absolute path
+ * @param operations allowed operations
+ */
+public record StorageAccessRequest(Path location, Set operations) {
+ public StorageAccessRequest {
+ if (!Objects.requireNonNull(location).isAbsolute()) {
+ throw new IllegalArgumentException("Location must be absolute. Got: %s".formatted(location));
+ }
+ if (Objects.requireNonNull(operations).isEmpty()) {
+ throw new IllegalArgumentException("Allowed operations cannot be empty");
+ }
+ }
+}
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/credential/StorageOperation.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/credential/StorageOperation.java
new file mode 100644
index 000000000000..727d69f9c07b
--- /dev/null
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/credential/StorageOperation.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.metastore.credential;
+
+/**
+ * The list of I/O operations to access a storage system.
+ */
+public enum StorageOperation {
+ LIST, READ, CREATE, DELETE,
+}
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/credential/VendedCredentialProvider.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/credential/VendedCredentialProvider.java
new file mode 100644
index 000000000000..95b75ba494cf
--- /dev/null
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/credential/VendedCredentialProvider.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.metastore.credential;
+
+import org.apache.hadoop.classification.InterfaceStability;
+
+import java.util.List;
+
+/**
+ * A credential-vending service.
+ */
+@InterfaceStability.Unstable
+public interface VendedCredentialProvider {
+ /**
+ * Checks whether this provider supports the given access request.
+ *
+ * @param request the access request
+ * @return true if this provider supports the given access request
+ */
+ boolean supports(StorageAccessRequest request);
+
+ /**
+ * Vends credentials for the given access.
+ *
+ * @param username the authenticated username
+ * @param accessRequests the vending requests
+ * @return a list of vended credentials
+ */
+ List vend(String username, List accessRequests);
+}
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/credential/VendedStorageCredential.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/credential/VendedStorageCredential.java
new file mode 100644
index 000000000000..3f00a59634d9
--- /dev/null
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/credential/VendedStorageCredential.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.metastore.credential;
+
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.Path;
+
+import java.time.Instant;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * Vended credential properties.
+ */
+@InterfaceStability.Unstable
+public record VendedStorageCredential(Path prefix, Map credentials, Instant expiredAt) {
+ public VendedStorageCredential {
+ Objects.requireNonNull(prefix);
+ Objects.requireNonNull(credentials);
+ Objects.requireNonNull(expiredAt);
+ }
+}
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/credential/s3/S3Location.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/credential/s3/S3Location.java
new file mode 100644
index 000000000000..6e220f0d8834
--- /dev/null
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/credential/s3/S3Location.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.metastore.credential.s3;
+
+import org.apache.hadoop.fs.Path;
+import software.amazon.awssdk.arns.Arn;
+
+import java.net.URI;
+import java.util.Optional;
+import java.util.Set;
+
+/**
+ * An S3 location.
+ */
+final class S3Location {
+ private static final Set SCHEMES = Set.of("s3", "s3a", "s3n");
+
+ private final String partition;
+ private final String bucket;
+ private final String path;
+
+ private S3Location(String partition, String bucket, String path) {
+ this.partition = partition;
+ this.bucket = bucket;
+ this.path = path;
+ }
+
+ static Optional create(String partition, URI uri) {
+ final var scheme = uri.getScheme();
+ if (scheme == null) {
+ return Optional.empty();
+ }
+ if (!SCHEMES.contains(scheme)) {
+ return Optional.empty();
+ }
+ final var bucket = uri.getAuthority();
+ if (bucket == null) {
+ return Optional.empty();
+ }
+ final var rawPath = uri.getPath();
+ if (rawPath == null) {
+ return Optional.empty();
+ }
+ final var path = rawPath.endsWith(Path.SEPARATOR) ? rawPath : rawPath + Path.SEPARATOR;
+ return Optional.of(new S3Location(partition, bucket, path));
+ }
+
+ Arn getBucketArn() {
+ return Arn.builder().partition(partition).service("s3").resource(bucket).build();
+ }
+
+ Arn getWildCardArn() {
+ return Arn.builder().partition(partition).service("s3").resource("%s%s*".formatted(bucket, path)).build();
+ }
+
+ String getWildCardPath() {
+ return path.substring(1) + "*";
+ }
+
+ boolean matches(String prefix) {
+ final var optionalArn = Arn.tryFromString(prefix);
+ if (optionalArn.isPresent()) {
+ return getWildCardArn().toString().startsWith(prefix);
+ }
+ return "%s%s".formatted(bucket, path).startsWith(prefix);
+ }
+}
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/credential/s3/S3VendedCredentialProvider.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/credential/s3/S3VendedCredentialProvider.java
new file mode 100644
index 000000000000..24b9fd6b7866
--- /dev/null
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/credential/s3/S3VendedCredentialProvider.java
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.metastore.credential.s3;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.metastore.credential.StorageAccessRequest;
+import org.apache.hadoop.hive.metastore.credential.VendedCredentialProvider;
+import org.apache.hadoop.hive.metastore.credential.VendedStorageCredential;
+import software.amazon.awssdk.arns.Arn;
+import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
+import software.amazon.awssdk.policybuilder.iam.IamConditionOperator;
+import software.amazon.awssdk.policybuilder.iam.IamEffect;
+import software.amazon.awssdk.policybuilder.iam.IamPolicy;
+import software.amazon.awssdk.policybuilder.iam.IamStatement;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.sts.StsClient;
+import software.amazon.awssdk.services.sts.model.AssumeRoleRequest;
+
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * A VendedCredentialProvider specified for Amazon S3.
+ */
+public class S3VendedCredentialProvider implements VendedCredentialProvider {
+ private static final String PREFIXES_KEY = "aws.prefixes";
+ private static final String REGION_KEY = "aws.region";
+ private static final String ROLE_ARN_KEY = "aws.role-arn";
+ private static final String EXTERNAL_ID_KEY = "aws.external-id";
+ private static final String CREDENTIAL_EXPIRATION_KEY = "aws.expiration";
+ private static final String SESSION_PREFIX = "hms_";
+
+ private final Arn roleArn;
+ private final String externalId;
+ private final List prefixes;
+ private final int expirationInSeconds;
+ private final StsClient stsClient;
+
+ private static StsClient createStsClient(String region) {
+ final var credentialsProvider = DefaultCredentialsProvider.builder().build();
+ final var builder = StsClient.builder().credentialsProvider(credentialsProvider);
+ return region == null ? builder.build() : builder.region(Region.of(region)).build();
+ }
+
+ private static List createPrefixes(String[] prefixes) {
+ if (prefixes == null) {
+ return Collections.emptyList();
+ }
+ return Arrays.asList(prefixes);
+ }
+
+ public S3VendedCredentialProvider(String configKeyPrefix, Configuration conf) {
+ this(
+ Arn.fromString(Objects.requireNonNull(conf.get(configKeyPrefix + ROLE_ARN_KEY))),
+ conf.get(configKeyPrefix + EXTERNAL_ID_KEY),
+ createPrefixes(conf.getStrings(configKeyPrefix + PREFIXES_KEY, (String) null)),
+ (int) Math.min(
+ Integer.MAX_VALUE,
+ conf.getTimeDuration(configKeyPrefix + CREDENTIAL_EXPIRATION_KEY, 3600, TimeUnit.SECONDS)
+ ),
+ createStsClient(conf.get(configKeyPrefix + REGION_KEY))
+ );
+ }
+
+ @VisibleForTesting
+ S3VendedCredentialProvider(Arn roleArn, String externalId, List prefixes, int expirationSeconds,
+ StsClient stsClient) {
+ this.prefixes = prefixes;
+ this.roleArn = Objects.requireNonNull(roleArn);
+ this.externalId = externalId;
+ this.expirationInSeconds = expirationSeconds;
+ this.stsClient = Objects.requireNonNull(stsClient);
+ }
+
+ @Override
+ public boolean supports(StorageAccessRequest request) {
+ final var optionalLocation = S3Location.create(roleArn.partition(), request.location().toUri());
+ if (optionalLocation.isEmpty()) {
+ return false;
+ }
+ if (prefixes.isEmpty()) {
+ // Accepts all legal S3 paths
+ return true;
+ }
+ final var location = optionalLocation.orElseThrow();
+ return prefixes.stream().anyMatch(location::matches);
+ }
+
+ @Override
+ public List vend(String username, List accessRequests) {
+ // This provider issues a single assume-role request and get a merged credential to reduce the number of requests
+ final var assumeRoleRequest = AssumeRoleRequest.builder().externalId(externalId).roleArn(roleArn.toString())
+ .roleSessionName(createRoleSessionName(username)).durationSeconds(expirationInSeconds)
+ .policy(buildPolicy(accessRequests).toJson()).build();
+ final var response = stsClient.assumeRole(assumeRoleRequest);
+ final var awsCredentials = response.credentials();
+
+ final var builder = new HashMap();
+ builder.put("s3.access-key-id", awsCredentials.accessKeyId());
+ builder.put("s3.secret-access-key", awsCredentials.secretAccessKey());
+ builder.put("s3.session-token", awsCredentials.sessionToken());
+ final Instant expiredAt;
+ if (awsCredentials.expiration() == null) {
+ expiredAt = Instant.MAX;
+ } else {
+ expiredAt = awsCredentials.expiration();
+ final var epochMillis = String.valueOf(awsCredentials.expiration().toEpochMilli());
+ builder.put("s3.session-token-expires-at-ms", epochMillis);
+ }
+ final var credentials = Collections.unmodifiableMap(builder);
+
+ return accessRequests.stream()
+ .map(request -> new VendedStorageCredential(request.location(), credentials, expiredAt)).toList();
+ }
+
+ @Override
+ public String toString() {
+ return "S3VendedCredentialProvider{" + "role='" + roleArn + '\'' + ", prefixes=" + prefixes + '}';
+ }
+
+ private static String createRoleSessionName(String username) {
+ final var builder = new StringBuilder(SESSION_PREFIX.length() + username.length());
+ builder.append(SESSION_PREFIX);
+ for (char c: username.toCharArray()) {
+ if ("abcdefghijklmnopqrstuvwxyz0123456789,.@-".contains(
+ Character.toString(c).toLowerCase(Locale.ENGLISH))) {
+ builder.append(c);
+ } else {
+ builder.append('-');
+ }
+ }
+ return builder.toString();
+ }
+
+ /**
+ * Creates a down-scoped policy for the given request.
+ */
+ private IamPolicy buildPolicy(List requests) {
+ final Map bucketLocationBuilder = new HashMap<>();
+ final Map listBuilder = new HashMap<>();
+ final List readResources = new ArrayList<>();
+ final List createResources = new ArrayList<>();
+ final List deleteResources = new ArrayList<>();
+
+ requests.forEach(request -> {
+ Preconditions.checkArgument(supports(request));
+ final var s3Location = S3Location.create(roleArn.partition(), request.location().toUri()).orElseThrow();
+ final var bucketArn = s3Location.getBucketArn().toString();
+ final var wildCardArn = s3Location.getWildCardArn().toString();
+ bucketLocationBuilder.computeIfAbsent(bucketArn,
+ key -> IamStatement.builder().effect(IamEffect.ALLOW).addAction("s3:GetBucketLocation").addResource(key));
+ request.operations().forEach(action -> {
+ switch (action) {
+ case LIST -> listBuilder.computeIfAbsent(bucketArn,
+ key -> IamStatement.builder().effect(IamEffect.ALLOW).addAction("s3:ListBucket").addResource(key))
+ .addCondition(IamConditionOperator.STRING_LIKE, "s3:prefix", s3Location.getWildCardPath());
+ case READ -> readResources.add(wildCardArn);
+ case CREATE -> createResources.add(wildCardArn);
+ case DELETE -> deleteResources.add(wildCardArn);
+ default -> throw new IllegalArgumentException("Unexpected action: " + action);
+ }
+ });
+ });
+
+ final var policyBuilder = IamPolicy.builder();
+ bucketLocationBuilder.values().stream().map(IamStatement.Builder::build).forEach(policyBuilder::addStatement);
+ listBuilder.values().stream().map(IamStatement.Builder::build).forEach(policyBuilder::addStatement);
+ if (!readResources.isEmpty()) {
+ policyBuilder.addStatement(builder -> {
+ builder.effect(IamEffect.ALLOW).addAction("s3:GetObject").addAction("s3:GetObjectVersion");
+ readResources.forEach(builder::addResource);
+ });
+ }
+ if (!createResources.isEmpty()) {
+ policyBuilder.addStatement(builder -> {
+ builder.effect(IamEffect.ALLOW).addAction("s3:PutObject");
+ createResources.forEach(builder::addResource);
+ });
+ }
+ if (!deleteResources.isEmpty()) {
+ policyBuilder.addStatement(builder -> {
+ builder.effect(IamEffect.ALLOW).addAction("s3:DeleteObject");
+ deleteResources.forEach(builder::addResource);
+ });
+ }
+ return policyBuilder.build();
+ }
+}
diff --git a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/annotation/MetastoreExternalTest.java b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/annotation/MetastoreExternalTest.java
new file mode 100644
index 000000000000..ea436d77433d
--- /dev/null
+++ b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/annotation/MetastoreExternalTest.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.metastore.annotation;
+
+/**
+ * Marker interface for tests that require external systems or credentials and should only run
+ * when explicitly opted into.
+ */
+public interface MetastoreExternalTest extends MetastoreTest {
+}
diff --git a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/credential/TestCachedVendedCredentialProvider.java b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/credential/TestCachedVendedCredentialProvider.java
new file mode 100644
index 000000000000..caeaa093c8e9
--- /dev/null
+++ b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/credential/TestCachedVendedCredentialProvider.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.metastore.credential;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.metastore.annotation.MetastoreUnitTest;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.mockito.Mockito;
+
+import java.time.Clock;
+import java.time.Duration;
+import java.time.Instant;
+import java.time.ZoneOffset;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Map;
+
+@Category(MetastoreUnitTest.class)
+public class TestCachedVendedCredentialProvider {
+ @Test
+ public void testSupportsDelegatesToWrappedProvider() {
+ var request = new StorageAccessRequest(
+ new Path("s3://bucket-a/warehouse/table"),
+ EnumSet.of(StorageOperation.READ));
+ var delegate = Mockito.mock(VendedCredentialProvider.class);
+ Mockito.when(delegate.supports(request)).thenReturn(true);
+ var provider = new CachedVendedCredentialProvider(
+ delegate,
+ 100,
+ Duration.ofMinutes(30),
+ Clock.systemUTC());
+
+ Assert.assertTrue(provider.supports(request));
+ Mockito.verify(delegate).supports(request);
+ }
+
+ @Test
+ public void testVend() {
+ var now = Instant.parse("2026-04-26T12:00:00Z");
+ var path = new Path("s3://bucket-a/warehouse/table");
+ var request = new StorageAccessRequest(path, EnumSet.of(StorageOperation.READ));
+ var requests = List.of(request);
+ var response = List.of(new VendedStorageCredential(path, Map.of("token", "first"),
+ now.plus(Duration.ofMinutes(20))));
+ var delegate = Mockito.mock(VendedCredentialProvider.class);
+ Mockito.when(delegate.vend("alice", requests))
+ .thenReturn(response)
+ .thenThrow(new AssertionError("The second request should be served from cache"));
+ var provider = new CachedVendedCredentialProvider(
+ delegate,
+ 100,
+ Duration.ofMinutes(30),
+ Clock.fixed(now, ZoneOffset.UTC));
+
+ var first = provider.vend("alice", requests);
+ var second = provider.vend("alice", requests);
+ Assert.assertSame(first, second);
+
+ Mockito.verify(delegate, Mockito.times(1)).vend("alice", requests);
+ Mockito.verifyNoMoreInteractions(delegate);
+ }
+
+ @Test
+ public void testVendWithDifferentPrincipals() {
+ var now = Instant.parse("2026-04-26T12:00:00Z");
+ var path = new Path("s3://bucket-a/warehouse/table");
+ var request = new StorageAccessRequest(path, EnumSet.of(StorageOperation.READ));
+ var requests = List.of(request);
+ var response = List.of(new VendedStorageCredential(path, Map.of("token", "first"),
+ now.plus(Duration.ofMinutes(20))));
+ var delegate = Mockito.mock(VendedCredentialProvider.class);
+ Mockito.when(delegate.vend("alice", requests))
+ .thenReturn(response)
+ .thenThrow(new AssertionError("The second request should be served from cache"));
+ Mockito.when(delegate.vend("bob", requests))
+ .thenReturn(response)
+ .thenThrow(new AssertionError("The second request should be served from cache"));
+ var provider = new CachedVendedCredentialProvider(
+ delegate,
+ 100,
+ Duration.ofMinutes(30),
+ Clock.fixed(now, ZoneOffset.UTC));
+
+ var alice = provider.vend("alice", requests);
+ var bob = provider.vend("bob", requests);
+ Assert.assertSame(alice, bob);
+
+ Mockito.verify(delegate, Mockito.times(1)).vend("alice", requests);
+ Mockito.verify(delegate, Mockito.times(1)).vend("bob", requests);
+ Mockito.verifyNoMoreInteractions(delegate);
+ }
+}
diff --git a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/credential/TestCompositeVendedCredentialProvider.java b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/credential/TestCompositeVendedCredentialProvider.java
new file mode 100644
index 000000000000..8f44085c4f39
--- /dev/null
+++ b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/credential/TestCompositeVendedCredentialProvider.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.metastore.credential;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.metastore.annotation.MetastoreUnitTest;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.mockito.Mockito;
+
+import java.time.Instant;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Stream;
+
+@Category(MetastoreUnitTest.class)
+public class TestCompositeVendedCredentialProvider {
+ @Test
+ public void testVendRoutesRequestsToSupportingProviders() {
+ var firstPath = new Path("s3://bucket-a/warehouse/table-a");
+ var firstRequest = new StorageAccessRequest(firstPath, EnumSet.of(StorageOperation.READ));
+ var secondPath = new Path("s3://bucket-b/warehouse/table-b");
+ var secondRequest = new StorageAccessRequest(secondPath, EnumSet.of(StorageOperation.READ));
+ var thirdPath = new Path("s3://bucket-a/warehouse/table-c");
+ var thirdRequest = new StorageAccessRequest(thirdPath, EnumSet.of(StorageOperation.READ));
+ var requests = List.of(firstRequest, secondRequest, thirdRequest);
+ var expiration = Instant.MAX;
+
+ var firstProvider = Mockito.mock(VendedCredentialProvider.class);
+ Mockito.when(firstProvider.supports(firstRequest)).thenReturn(true);
+ Mockito.when(firstProvider.supports(secondRequest)).thenReturn(false);
+ Mockito.when(firstProvider.supports(thirdRequest)).thenReturn(true);
+ var firstCredentials = List.of(
+ new VendedStorageCredential(firstPath, Map.of("provider", "first"), expiration),
+ new VendedStorageCredential(thirdPath, Map.of("provider", "first"), expiration)
+ );
+ Mockito.when(firstProvider.vend("alice", List.of(firstRequest, thirdRequest))).thenReturn(firstCredentials);
+
+ var secondProvider = Mockito.mock(VendedCredentialProvider.class);
+ Mockito.when(secondProvider.supports(secondRequest)).thenReturn(true);
+ var secondCredentials = List.of(new VendedStorageCredential(secondPath, Map.of("provider", "first"), expiration));
+ Mockito.when(secondProvider.vend("alice", List.of(secondRequest))).thenReturn(secondCredentials);
+
+ var provider = new CompositeVendedCredentialProvider(List.of(firstProvider, secondProvider));
+
+ var credentials = provider.vend("alice", requests);
+ var expected = Stream.concat(firstCredentials.stream(), secondCredentials.stream()).toList();
+ Assert.assertEquals(expected, credentials);
+
+ Mockito.verify(firstProvider).supports(firstRequest);
+ Mockito.verify(firstProvider).supports(secondRequest);
+ Mockito.verify(firstProvider).supports(thirdRequest);
+ Mockito.verify(firstProvider).vend("alice", List.of(firstRequest, thirdRequest));
+ Mockito.verify(secondProvider).supports(secondRequest);
+ Mockito.verify(secondProvider).vend("alice", List.of(secondRequest));
+ Mockito.verifyNoMoreInteractions(firstProvider, secondProvider);
+ }
+
+ @Test
+ public void testVendUsesFirstSupportingProvider() {
+ var path = new Path("s3://bucket-a/warehouse/table-a");
+ var request = new StorageAccessRequest(path, EnumSet.of(StorageOperation.READ));
+ var requests = List.of(request);
+
+ var firstProvider = Mockito.mock(VendedCredentialProvider.class);
+ Mockito.when(firstProvider.supports(request)).thenReturn(true);
+ var expected = List.of(new VendedStorageCredential(path, Map.of("provider", "first"), Instant.MAX));
+ Mockito.when(firstProvider.vend("alice", requests)).thenReturn(expected);
+
+ var secondProvider = Mockito.mock(VendedCredentialProvider.class);
+
+ var provider = new CompositeVendedCredentialProvider(List.of(firstProvider, secondProvider));
+
+ var credentials = provider.vend("alice", requests);
+ Assert.assertEquals(expected, credentials);
+
+ Mockito.verify(firstProvider).supports(request);
+ Mockito.verify(firstProvider).vend("alice", requests);
+ Mockito.verifyNoInteractions(secondProvider);
+ }
+
+ @Test
+ public void testVendFallsBackToNoopProviderWhenNoProviderSupportsRequest() {
+ var request = new StorageAccessRequest(
+ new Path("s3://bucket-a/warehouse/table-a"),
+ EnumSet.of(StorageOperation.READ));
+ var requests = List.of(request);
+
+ var firstProvider = Mockito.mock(VendedCredentialProvider.class);
+ Mockito.when(firstProvider.supports(request)).thenReturn(false);
+
+ var secondProvider = Mockito.mock(VendedCredentialProvider.class);
+ Mockito.when(secondProvider.supports(request)).thenReturn(false);
+
+ var provider = new CompositeVendedCredentialProvider(List.of(firstProvider, secondProvider));
+
+ var credentials = provider.vend("alice", requests);
+
+ Assert.assertTrue(credentials.isEmpty());
+ Mockito.verify(firstProvider).supports(request);
+ Mockito.verify(secondProvider).supports(request);
+ Mockito.verifyNoMoreInteractions(firstProvider, secondProvider);
+ }
+}
diff --git a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/credential/s3/TestS3Location.java b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/credential/s3/TestS3Location.java
new file mode 100644
index 000000000000..6a149b7ce413
--- /dev/null
+++ b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/credential/s3/TestS3Location.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.metastore.credential.s3;
+
+import org.apache.hadoop.hive.metastore.annotation.MetastoreUnitTest;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+
+@Category(MetastoreUnitTest.class)
+public class TestS3Location {
+ @Test
+ public void test() {
+ var location = S3Location.create("aws", URI.create("s3://bucket/warehouse/tbl")).orElseThrow();
+ Assert.assertEquals("arn:aws:s3:::bucket", location.getBucketArn().toString());
+ Assert.assertEquals("arn:aws:s3:::bucket/warehouse/tbl/*", location.getWildCardArn().toString());
+ Assert.assertEquals("warehouse/tbl/*", location.getWildCardPath());
+ }
+
+ @Test
+ public void testTrailingSlash() {
+ var location = S3Location.create("aws", URI.create("s3://bucket/warehouse/tbl/")).orElseThrow();
+ Assert.assertEquals("arn:aws:s3:::bucket", location.getBucketArn().toString());
+ Assert.assertEquals("arn:aws:s3:::bucket/warehouse/tbl/*", location.getWildCardArn().toString());
+ Assert.assertEquals("warehouse/tbl/*", location.getWildCardPath());
+ }
+
+ @Test
+ public void testRoot() {
+ var location = S3Location.create("aws", URI.create("s3://bucket")).orElseThrow();
+ Assert.assertEquals("arn:aws:s3:::bucket", location.getBucketArn().toString());
+ Assert.assertEquals("arn:aws:s3:::bucket/*", location.getWildCardArn().toString());
+ Assert.assertEquals("*", location.getWildCardPath());
+ }
+
+ @Test
+ public void testRootWithTrailingSlash() {
+ var location = S3Location.create("aws", URI.create("s3://bucket/")).orElseThrow();
+ Assert.assertEquals("arn:aws:s3:::bucket", location.getBucketArn().toString());
+ Assert.assertEquals("arn:aws:s3:::bucket/*", location.getWildCardArn().toString());
+ Assert.assertEquals("*", location.getWildCardPath());
+ }
+
+ @Test
+ public void testS3A() {
+ var location = S3Location.create("aws", URI.create("s3a://bucket/warehouse/tbl")).orElseThrow();
+ Assert.assertEquals("arn:aws:s3:::bucket", location.getBucketArn().toString());
+ Assert.assertEquals("arn:aws:s3:::bucket/warehouse/tbl/*", location.getWildCardArn().toString());
+ Assert.assertEquals("warehouse/tbl/*", location.getWildCardPath());
+ }
+
+ @Test
+ public void testS3N() {
+ var location = S3Location.create("aws", URI.create("s3n://bucket/warehouse/tbl")).orElseThrow();
+ Assert.assertEquals("arn:aws:s3:::bucket", location.getBucketArn().toString());
+ Assert.assertEquals("arn:aws:s3:::bucket/warehouse/tbl/*", location.getWildCardArn().toString());
+ Assert.assertEquals("warehouse/tbl/*", location.getWildCardPath());
+ }
+
+ @Test
+ public void testPartition() {
+ var location = S3Location.create("aws-us-gov", URI.create("s3://bucket/warehouse/tbl")).orElseThrow();
+ Assert.assertEquals("arn:aws-us-gov:s3:::bucket", location.getBucketArn().toString());
+ Assert.assertEquals("arn:aws-us-gov:s3:::bucket/warehouse/tbl/*", location.getWildCardArn().toString());
+ Assert.assertEquals("warehouse/tbl/*", location.getWildCardPath());
+ }
+
+ @Test
+ public void testMatchesArnPrefix() {
+ var location = S3Location.create("aws", URI.create("s3://bucket/warehouse/tbl")).orElseThrow();
+
+ Assert.assertTrue(location.matches("arn:aws:s3:::bucket"));
+ Assert.assertTrue(location.matches("arn:aws:s3:::bucket/warehouse/"));
+ Assert.assertFalse(location.matches("arn:aws:s3:::bucket/curated/"));
+ }
+
+ @Test
+ public void testMatchesBucketPrefix() {
+ var location = S3Location.create("aws", URI.create("s3://bucket/warehouse/tbl")).orElseThrow();
+
+ Assert.assertTrue(location.matches("bucket"));
+ Assert.assertTrue(location.matches("bucket/warehouse/"));
+ Assert.assertFalse(location.matches("bucket/curated/"));
+ }
+
+ @Test
+ public void testUnsupportedPaths() {
+ Assert.assertTrue(S3Location.create("aws", URI.create("/bucket/warehouse/tbl")).isEmpty());
+ Assert.assertTrue(S3Location.create("aws", URI.create("s3b://bucket/warehouse/tbl")).isEmpty());
+ Assert.assertTrue(S3Location.create("aws", URI.create("s3:///warehouse/tbl")).isEmpty());
+ Assert.assertTrue(S3Location.create("aws", URI.create("s3:bucket")).isEmpty());
+ }
+}
diff --git a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/credential/s3/TestS3VendedCredentialProvider.java b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/credential/s3/TestS3VendedCredentialProvider.java
new file mode 100644
index 000000000000..657cae594e15
--- /dev/null
+++ b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/credential/s3/TestS3VendedCredentialProvider.java
@@ -0,0 +1,329 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.metastore.credential.s3;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.metastore.annotation.MetastoreUnitTest;
+import org.apache.hadoop.hive.metastore.credential.StorageAccessRequest;
+import org.apache.hadoop.hive.metastore.credential.StorageOperation;
+import org.apache.hadoop.hive.metastore.credential.VendedStorageCredential;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mockito;
+import software.amazon.awssdk.arns.Arn;
+import software.amazon.awssdk.services.sts.StsClient;
+import software.amazon.awssdk.services.sts.model.AssumeRoleRequest;
+import software.amazon.awssdk.services.sts.model.AssumeRoleResponse;
+import software.amazon.awssdk.services.sts.model.Credentials;
+
+import java.time.Instant;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Map;
+
+@Category(MetastoreUnitTest.class)
+public class TestS3VendedCredentialProvider {
+ private static final ObjectMapper MAPPER = new ObjectMapper();
+
+ @Test
+ public void testSupportsWhenPrefixesAreEmpty() {
+ var provider = new S3VendedCredentialProvider(
+ Arn.fromString("arn:aws:iam::123456789012:role/test-role"),
+ null,
+ Collections.emptyList(),
+ 3600,
+ Mockito.mock(StsClient.class));
+
+ Assert.assertTrue(provider.supports(new StorageAccessRequest(
+ new Path("s3://bucket-a/warehouse/table"),
+ EnumSet.of(StorageOperation.READ))));
+ }
+
+ @Test
+ public void testSupportsWithPrefix() {
+ var provider = new S3VendedCredentialProvider(
+ Arn.fromString("arn:aws:iam::123456789012:role/test-role"),
+ null,
+ List.of("bucket-a/warehouse/", "bucket-x/hive/"),
+ 3600,
+ Mockito.mock(StsClient.class));
+
+ var matched = new StorageAccessRequest(
+ new Path("s3a://bucket-a/warehouse/table"),
+ EnumSet.of(StorageOperation.READ));
+ Assert.assertTrue(provider.supports(matched));
+ var unmatched = new StorageAccessRequest(
+ new Path("s3a://bucket-b/warehouse/table"),
+ EnumSet.of(StorageOperation.READ));
+ Assert.assertFalse(provider.supports(unmatched));
+ var requests = Collections.singletonList(unmatched);
+ Assert.assertThrows(IllegalArgumentException.class, () -> provider.vend("test-user", requests));
+ }
+
+ @Test
+ public void testSupportsWithArnPrefix() {
+ var provider = new S3VendedCredentialProvider(
+ Arn.fromString("arn:aws:iam::123456789012:role/test-role"),
+ null,
+ List.of("arn:aws:s3:::bucket-a/warehouse/", "arn:aws:s3:::bucket-x/hive/"),
+ 3600,
+ Mockito.mock(StsClient.class));
+
+ var matched = new StorageAccessRequest(
+ new Path("s3a://bucket-a/warehouse/table"),
+ EnumSet.of(StorageOperation.READ));
+ Assert.assertTrue(provider.supports(matched));
+ var unmatched = new StorageAccessRequest(
+ new Path("s3a://bucket-b/warehouse/table"),
+ EnumSet.of(StorageOperation.READ));
+ Assert.assertFalse(provider.supports(unmatched));
+ Assert.assertThrows(IllegalArgumentException.class,
+ () -> provider.vend("test-user", Collections.singletonList(unmatched)));
+ }
+
+ @Test
+ public void testSupportsWithUnsupportedPaths() {
+ var provider = new S3VendedCredentialProvider(
+ Arn.fromString("arn:aws:iam::123456789012:role/test-role"),
+ null,
+ Collections.emptyList(),
+ 3600,
+ Mockito.mock(StsClient.class));
+
+ var nonSchema = new StorageAccessRequest(
+ new Path("/bucket-a/warehouse/table"),
+ EnumSet.of(StorageOperation.READ));
+ Assert.assertFalse(provider.supports(nonSchema));
+
+ var nonS3 = new StorageAccessRequest(
+ new Path("hdfs://bucket-a/warehouse/table"),
+ EnumSet.of(StorageOperation.READ));
+ Assert.assertFalse(provider.supports(nonS3));
+
+ var nonAuthority = new StorageAccessRequest(
+ new Path("s3a:///warehouse/table"),
+ EnumSet.of(StorageOperation.READ));
+ Assert.assertFalse(provider.supports(nonAuthority));
+ }
+
+ @Test
+ public void testVend() throws Exception {
+ var stsClient = Mockito.mock(StsClient.class);
+ var requestCaptor = ArgumentCaptor.forClass(AssumeRoleRequest.class);
+ var accessKey = "dummy-access-key";
+ var secretKey = "dummy-secret-key";
+ var sessionToken = "dummy-session-token";
+ var expiration = Instant.parse("2026-04-26T12:00:00Z");
+ Mockito.when(stsClient.assumeRole(requestCaptor.capture())).thenReturn(
+ AssumeRoleResponse.builder().credentials(
+ Credentials.builder()
+ .accessKeyId(accessKey)
+ .secretAccessKey(secretKey)
+ .sessionToken(sessionToken)
+ .expiration(expiration)
+ .build())
+ .build());
+
+ var provider = new S3VendedCredentialProvider(
+ Arn.fromString("arn:aws-us-gov:iam::123456789012:role/test-role"),
+ "external-id",
+ Collections.emptyList(),
+ 1200,
+ stsClient);
+ var credentials = provider.vend(
+ "User Name+1@example.com",
+ List.of(
+ new StorageAccessRequest(
+ new Path("s3://bucket-realtime/warehouse/table"),
+ EnumSet.of(StorageOperation.LIST, StorageOperation.READ, StorageOperation.CREATE,
+ StorageOperation.DELETE)),
+ new StorageAccessRequest(
+ new Path("s3n://bucket-archive/warehouse/table"),
+ EnumSet.of(StorageOperation.LIST, StorageOperation.READ))
+ )
+ );
+
+ var expected = List.of(
+ new VendedStorageCredential(
+ new Path("s3://bucket-realtime/warehouse/table"),
+ Map.of(
+ "s3.access-key-id", accessKey,
+ "s3.secret-access-key", secretKey,
+ "s3.session-token", sessionToken,
+ "s3.session-token-expires-at-ms", "1777204800000"
+ ),
+ expiration
+ ),
+ new VendedStorageCredential(
+ new Path("s3n://bucket-archive/warehouse/table"),
+ Map.of(
+ "s3.access-key-id", accessKey,
+ "s3.secret-access-key", secretKey,
+ "s3.session-token", sessionToken,
+ "s3.session-token-expires-at-ms", "1777204800000"
+ ),
+ expiration
+ )
+ );
+ Assert.assertEquals(expected, credentials);
+
+ var request = requestCaptor.getValue();
+ Assert.assertEquals("arn:aws-us-gov:iam::123456789012:role/test-role", request.roleArn());
+ Assert.assertEquals("external-id", request.externalId());
+ Assert.assertEquals(Integer.valueOf(1200), request.durationSeconds());
+ Assert.assertEquals("hms_User-Name-1@example.com", request.roleSessionName());
+
+ Assert.assertEquals(
+ MAPPER.readTree("""
+ {
+ "Version": "2012-10-17",
+ "Statement": [
+ {
+ "Effect": "Allow",
+ "Action": "s3:GetBucketLocation",
+ "Resource": "arn:aws-us-gov:s3:::bucket-archive"
+ },
+ {
+ "Effect": "Allow",
+ "Action": "s3:GetBucketLocation",
+ "Resource": "arn:aws-us-gov:s3:::bucket-realtime"
+ },
+ {
+ "Effect": "Allow",
+ "Action": "s3:ListBucket",
+ "Resource": "arn:aws-us-gov:s3:::bucket-archive",
+ "Condition": {
+ "StringLike": {
+ "s3:prefix": "warehouse/table/*"
+ }
+ }
+ },
+ {
+ "Effect": "Allow",
+ "Action": "s3:ListBucket",
+ "Resource": "arn:aws-us-gov:s3:::bucket-realtime",
+ "Condition": {
+ "StringLike": {
+ "s3:prefix": "warehouse/table/*"
+ }
+ }
+ },
+ {
+ "Effect": "Allow",
+ "Action": [
+ "s3:GetObject",
+ "s3:GetObjectVersion"
+ ],
+ "Resource": [
+ "arn:aws-us-gov:s3:::bucket-realtime/warehouse/table/*",
+ "arn:aws-us-gov:s3:::bucket-archive/warehouse/table/*"
+ ]
+ },
+ {
+ "Effect": "Allow",
+ "Action": "s3:PutObject",
+ "Resource": "arn:aws-us-gov:s3:::bucket-realtime/warehouse/table/*"
+ },
+ {
+ "Effect": "Allow",
+ "Action": "s3:DeleteObject",
+ "Resource": "arn:aws-us-gov:s3:::bucket-realtime/warehouse/table/*"
+ }
+ ]
+ }
+ """),
+ MAPPER.readTree(request.policy()));
+ }
+
+ @Test
+ public void testVendWithoutExpiration() throws Exception {
+ var stsClient = Mockito.mock(StsClient.class);
+ var requestCaptor = ArgumentCaptor.forClass(AssumeRoleRequest.class);
+ var accessKey = "dummy-access-key";
+ var secretKey = "dummy-secret-key";
+ var sessionToken = "dummy-session-token";
+ Mockito.when(stsClient.assumeRole(requestCaptor.capture())).thenReturn(
+ AssumeRoleResponse.builder().credentials(
+ Credentials.builder()
+ .accessKeyId(accessKey)
+ .secretAccessKey(secretKey)
+ .sessionToken(sessionToken)
+ .build())
+ .build());
+
+ var provider = new S3VendedCredentialProvider(
+ Arn.fromString("arn:aws:iam::123456789012:role/test-role"),
+ null,
+ Collections.emptyList(),
+ 3600,
+ stsClient);
+ var credentials = provider.vend(
+ "user",
+ List.of(
+ new StorageAccessRequest(
+ new Path("s3a://bucket-a/warehouse/table"),
+ EnumSet.of(StorageOperation.READ))
+ )
+ );
+
+ var expected = new VendedStorageCredential(
+ new Path("s3a://bucket-a/warehouse/table"),
+ Map.of(
+ "s3.access-key-id", accessKey,
+ "s3.secret-access-key", secretKey,
+ "s3.session-token", sessionToken
+ ),
+ Instant.MAX
+ );
+ Assert.assertEquals(List.of(expected), credentials);
+
+ var request = requestCaptor.getValue();
+ Assert.assertEquals("arn:aws:iam::123456789012:role/test-role", request.roleArn());
+ Assert.assertNull(request.externalId());
+ Assert.assertEquals(Integer.valueOf(3600), request.durationSeconds());
+ Assert.assertEquals("hms_user", request.roleSessionName());
+
+ Assert.assertEquals(
+ MAPPER.readTree("""
+ {
+ "Version": "2012-10-17",
+ "Statement": [
+ {
+ "Effect": "Allow",
+ "Action": "s3:GetBucketLocation",
+ "Resource": "arn:aws:s3:::bucket-a"
+ },
+ {
+ "Effect": "Allow",
+ "Action": [
+ "s3:GetObject",
+ "s3:GetObjectVersion"
+ ],
+ "Resource": "arn:aws:s3:::bucket-a/warehouse/table/*"
+ }
+ ]
+ }
+ """),
+ MAPPER.readTree(request.policy()));
+ }
+}
diff --git a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/credential/s3/TestS3VendedCredentialProviderIntegration.java b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/credential/s3/TestS3VendedCredentialProviderIntegration.java
new file mode 100644
index 000000000000..a5dd72f46e85
--- /dev/null
+++ b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/credential/s3/TestS3VendedCredentialProviderIntegration.java
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.metastore.credential.s3;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.metastore.annotation.MetastoreExternalTest;
+import org.apache.hadoop.hive.metastore.credential.StorageAccessRequest;
+import org.apache.hadoop.hive.metastore.credential.StorageOperation;
+import org.apache.hadoop.hive.metastore.credential.VendedStorageCredential;
+import org.apache.hadoop.hive.metastore.testutils.AwsS3IntegrationTestConfig;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import software.amazon.awssdk.arns.Arn;
+import software.amazon.awssdk.auth.credentials.AwsSessionCredentials;
+import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
+import software.amazon.awssdk.core.sync.RequestBody;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.s3.S3Client;
+import software.amazon.awssdk.services.s3.model.AccessDeniedException;
+import software.amazon.awssdk.services.s3.model.DeleteObjectRequest;
+import software.amazon.awssdk.services.s3.model.GetBucketLocationRequest;
+import software.amazon.awssdk.services.s3.model.GetObjectRequest;
+import software.amazon.awssdk.services.s3.model.ListObjectsV2Request;
+import software.amazon.awssdk.services.s3.model.NoSuchKeyException;
+import software.amazon.awssdk.services.s3.model.PutObjectRequest;
+import software.amazon.awssdk.services.s3.model.S3Exception;
+import software.amazon.awssdk.services.s3.model.S3Object;
+import software.amazon.awssdk.services.sts.StsClient;
+
+import java.util.EnumSet;
+import java.util.List;
+import java.util.UUID;
+
+@Category(MetastoreExternalTest.class)
+public class TestS3VendedCredentialProviderIntegration {
+ private static final String READABLE_FILE_CONTENT = "this-is-read-only";
+ private static final String WRITABLE_FILE_CONTENT = "this-is-deletable";
+
+ private AwsS3IntegrationTestConfig config;
+ private S3Client adminS3;
+ private StsClient stsClient;
+ private String readOnlyPrefix;
+ private String readableKey;
+ private String readableKeyVersion;
+ private String writablePrefix;
+ private String writableKey;
+ private String writableKeyVersion;
+ private String deniedPrefix;
+ private String deniedKey;
+
+ private StsClient createStsClient() {
+ stsClient = StsClient.builder().region(config.region()).build();
+ return stsClient;
+ }
+
+ private static S3Client createSessionS3Client(Region region, VendedStorageCredential credential) {
+ var sessionCredentials = AwsSessionCredentials.create(
+ credential.credentials().get("s3.access-key-id"),
+ credential.credentials().get("s3.secret-access-key"),
+ credential.credentials().get("s3.session-token"));
+ return S3Client.builder()
+ .region(region)
+ .credentialsProvider(StaticCredentialsProvider.create(sessionCredentials))
+ .build();
+ }
+
+ private static void deleteObjectIfExists(S3Client s3, String bucket, String key) {
+ try {
+ s3.deleteObject(DeleteObjectRequest.builder().bucket(bucket).key(key).build());
+ } catch (S3Exception e) {
+ if (e.statusCode() != 404) {
+ throw e;
+ }
+ }
+ }
+
+ private void assertForbidden(Runnable command) {
+ Assert.assertThrows(AccessDeniedException.class, command::run);
+ }
+
+ private void assertNotFound(Runnable command) {
+ Assert.assertThrows(NoSuchKeyException.class, command::run);
+ }
+
+ @Before
+ public void setUp() {
+ Assume.assumeTrue("Test configurations are not available", AwsS3IntegrationTestConfig.isConfigured());
+
+ config = AwsS3IntegrationTestConfig.fromEnvironment();
+ adminS3 = S3Client.builder().region(config.region()).build();
+
+ var rootPrefix = "%s/%s".formatted(config.basePath(), UUID.randomUUID());
+ readOnlyPrefix = rootPrefix + "/read-only";
+ readableKey = readOnlyPrefix + "/readable.txt";
+ writablePrefix = rootPrefix + "/read-write";
+ writableKey = writablePrefix + "/writable.txt";
+ deniedPrefix = rootPrefix + "/denied";
+ deniedKey = deniedPrefix + "/denied/unreadable.txt";
+
+ readableKeyVersion = adminS3.putObject(
+ PutObjectRequest.builder().bucket(config.bucket()).key(readableKey).build(),
+ RequestBody.fromString(READABLE_FILE_CONTENT)).versionId();
+ Assert.assertNotNull(readableKeyVersion);
+ writableKeyVersion = adminS3.putObject(
+ PutObjectRequest.builder().bucket(config.bucket()).key(writableKey).build(),
+ RequestBody.fromString(WRITABLE_FILE_CONTENT)).versionId();
+ Assert.assertNotNull(writableKeyVersion);
+ adminS3.putObject(
+ PutObjectRequest.builder().bucket(config.bucket()).key(deniedKey).build(),
+ RequestBody.fromString("outside-scope"));
+ }
+
+ @After
+ public void tearDown() {
+ if (adminS3 != null) {
+ deleteObjectIfExists(adminS3, config.bucket(), readableKey);
+ deleteObjectIfExists(adminS3, config.bucket(), writableKey);
+ deleteObjectIfExists(adminS3, config.bucket(), deniedKey);
+ adminS3.close();
+ }
+ if (stsClient != null) {
+ stsClient.close();
+ }
+ }
+
+ @Test
+ public void testVend() {
+ var provider = new S3VendedCredentialProvider(
+ Arn.fromString(config.roleArn()),
+ config.externalId(),
+ List.of(),
+ 900,
+ createStsClient());
+
+ var requests = List.of(
+ new StorageAccessRequest(
+ new Path("s3://%s/%s".formatted(config.bucket(), readOnlyPrefix)),
+ EnumSet.of(StorageOperation.LIST, StorageOperation.READ)),
+ new StorageAccessRequest(
+ new Path("s3a://%s/%s".formatted(config.bucket(), writablePrefix)),
+ EnumSet.of(StorageOperation.LIST, StorageOperation.READ, StorageOperation.CREATE, StorageOperation.DELETE))
+ );
+
+ var credentials = provider.vend("integration-test-user", requests);
+ Assert.assertEquals(2, credentials.size());
+ // This provider gets a merged credential for all access requests
+ Assert.assertEquals(credentials.get(0).credentials(), credentials.get(1).credentials());
+
+ try (var sessionS3 = createSessionS3Client(config.region(), credentials.getFirst())) {
+ var location = sessionS3.getBucketLocation(GetBucketLocationRequest.builder().bucket(config.bucket()).build());
+ Assert.assertEquals("", location.locationConstraintAsString());
+
+ // Readable path
+ var listReadable = sessionS3 .listObjectsV2(
+ ListObjectsV2Request.builder().bucket(config.bucket()).prefix(readOnlyPrefix + "/").build())
+ .contents().stream().map(S3Object::key).toList();
+ Assert.assertEquals(List.of(readableKey), listReadable);
+
+ var getReadable = sessionS3.getObjectAsBytes(
+ GetObjectRequest.builder().bucket(config.bucket()).key(readableKey).versionId(readableKeyVersion).build()
+ ).asUtf8String();
+ Assert.assertEquals(READABLE_FILE_CONTENT, getReadable);
+
+ assertForbidden(() -> sessionS3 .putObject(
+ PutObjectRequest.builder().bucket(config.bucket()).key(readOnlyPrefix + "/test-put-readable.txt").build(),
+ RequestBody.fromString("test-put-readable")));
+
+ assertForbidden(() -> sessionS3 .deleteObject(
+ DeleteObjectRequest.builder().bucket(config.bucket()).key(readableKey).build()
+ ));
+
+ // Writable path
+ var listWritable = sessionS3 .listObjectsV2(
+ ListObjectsV2Request.builder().bucket(config.bucket()).prefix(writablePrefix + "/").build())
+ .contents().stream().map(S3Object::key).toList();
+ Assert.assertEquals(List.of(writableKey), listWritable);
+
+ var getWritable = sessionS3.getObjectAsBytes(
+ GetObjectRequest.builder().bucket(config.bucket()).key(writableKey).versionId(writableKeyVersion).build()
+ ).asUtf8String();
+ Assert.assertEquals(WRITABLE_FILE_CONTENT, getWritable);
+
+ sessionS3.putObject(
+ PutObjectRequest.builder().bucket(config.bucket()).key(writablePrefix + "/test-put-writable.txt").build(),
+ RequestBody.fromString("test-put-writable")
+ );
+ Assert.assertEquals("test-put-writable", sessionS3.getObjectAsBytes(
+ GetObjectRequest.builder().bucket(config.bucket()).key(writablePrefix + "/test-put-writable.txt").build()
+ ).asUtf8String());
+
+ sessionS3.deleteObject(
+ DeleteObjectRequest.builder().bucket(config.bucket()).key(writableKey).build());
+ assertNotFound(() -> sessionS3.getObject(
+ GetObjectRequest.builder().bucket(config.bucket()).key(writableKey).build()));
+
+ // Denied path
+ assertForbidden(() -> sessionS3 .listObjectsV2(
+ ListObjectsV2Request.builder().bucket(config.bucket()).prefix(deniedPrefix + "/").build()));
+ assertForbidden(() -> sessionS3.getObject(
+ GetObjectRequest.builder().bucket(config.bucket()).key(deniedKey).build()));
+ assertForbidden(() -> sessionS3.putObject(
+ PutObjectRequest.builder().bucket(config.bucket()).key(deniedPrefix + "/test-put-denied.txt").build(),
+ RequestBody.fromString("test-put-denied")));
+ assertForbidden(() -> sessionS3 .deleteObject(
+ DeleteObjectRequest.builder().bucket(config.bucket()).key(deniedKey).build()));
+ }
+ }
+}
diff --git a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/testutils/AwsS3IntegrationTestConfig.java b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/testutils/AwsS3IntegrationTestConfig.java
new file mode 100644
index 000000000000..7014803b3ce4
--- /dev/null
+++ b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/testutils/AwsS3IntegrationTestConfig.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.metastore.testutils;
+
+import software.amazon.awssdk.regions.Region;
+
+/**
+ * AWS Integration tests don't run by default. You need to set up your environment.
+ *
+ * ACCOUNT_ID={your AWS account ID}
+ * HMS_PRINCIPAL_ARN="arn:aws:iam::${ACCOUNT_ID}:{user or role}"
+ *
+ * REGION=us-east-1
+ * ROLE_NAME=hive-s3-vending-test-role
+ * export HIVE_IT_AWS_INTEGRATION_TEST_ENABLED=true
+ * export HIVE_IT_S3_BUCKET="$BUCKET-$ACCOUNT_ID-$REGION-an"
+ * export HIVE_IT_S3_TEST_PATH=hive-test
+ * export HIVE_IT_S3_ROLE_ARN="arn:aws:iam::${ACCOUNT_ID}:role/$ROLE_NAME"
+ * export HIVE_IT_S3_EXTERNAL_ID=hive-s3-vending-test
+ * export HIVE_IT_S3_REGION=us-east-1
+ *
+ * aws s3api create-bucket \
+ * --bucket "${HIVE_IT_S3_BUCKET}" \
+ * --region "${REGION}" \
+ * --bucket-namespace account-regional
+ *
+ * aws s3api put-bucket-versioning \
+ * --bucket "${HIVE_IT_S3_BUCKET}" \
+ * --versioning-configuration Status=Enabled
+ *
+ * cat > trust-policy.json < role-policy.json <3.25.0
4.0.4
4.3.0-SNAPSHOT
+ 2.42.25
1.9.4
1.3
5.7.1
@@ -140,6 +141,31 @@
+
+ software.amazon.awssdk
+ arns
+ ${aws.sdk.version}
+
+
+ software.amazon.awssdk
+ bundle
+ ${aws.sdk.version}
+
+
+ software.amazon.awssdk
+ iam-policy-builder
+ ${aws.sdk.version}
+
+
+ software.amazon.awssdk
+ sts
+ ${aws.sdk.version}
+
+
+ software.amazon.awssdk
+ url-connection-client
+ ${aws.sdk.version}
+
io.netty
netty-all
@@ -239,6 +265,11 @@
+
+ org.apache.hadoop
+ hadoop-aws
+ ${hadoop.version}
+
org.apache.hadoop
hadoop-common