diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java index 10015f74837c..37607d55b0be 100644 --- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java +++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java @@ -1927,6 +1927,10 @@ public enum ConfVars { "The pattern to extract a user name. This is effective when you use RegexPrincipalMapper. For example, if " + "you want to extract a user name from the local part of the email claim, set this to (.*)@example.com." ), + CATALOG_VENDED_CREDENTIALS_PROVIDERS("metastore.catalog.vended-credentials.providers", + "hive.metastore.catalog.vended-credentials.providers", "", + "List of comma-separated credential-vending provider IDs" + ), ICEBERG_CATALOG_SERVLET_PATH("metastore.iceberg.catalog.servlet.path", "hive.metastore.iceberg.catalog.servlet.path", "iceberg", "HMS Iceberg Catalog servlet path component of URL endpoint." @@ -1935,6 +1939,10 @@ public enum ConfVars { "hive.metastore.iceberg.catalog.cache.expiry", -1, "HMS Iceberg Catalog cache expiry." ), + ICEBERG_CATALOG_VENDED_CREDENTIALS_ENABLED("metastore.iceberg.catalog.vended-credentials.enabled", + "hive.metastore.iceberg.catalog.vended-credentials.enabled", false, + "Boolean flag to enable credential vending on Iceberg REST Catalog" + ), HTTPSERVER_THREADPOOL_MIN("hive.metastore.httpserver.threadpool.min", "hive.metastore.httpserver.threadpool.min", 8, "HMS embedded HTTP server minimum number of threads." diff --git a/standalone-metastore/metastore-rest-catalog/pom.xml b/standalone-metastore/metastore-rest-catalog/pom.xml index d987f7cce972..5d7327fbef8f 100644 --- a/standalone-metastore/metastore-rest-catalog/pom.xml +++ b/standalone-metastore/metastore-rest-catalog/pom.xml @@ -26,6 +26,13 @@ 1.10.1 + + org.apache.hive + hive-exec + ${hive.version} + core + provided + org.apache.hive hive-standalone-metastore-server @@ -72,10 +79,8 @@ - org.apache.hive - hive-exec - ${hive.version} - core + software.amazon.awssdk + bundle test @@ -99,6 +104,17 @@ tests test + + org.apache.iceberg + iceberg-aws + ${iceberg.version} + test + + + org.apache.hadoop + hadoop-aws + test + org.apache.iceberg iceberg-core diff --git a/standalone-metastore/metastore-rest-catalog/src/main/java/org/apache/iceberg/rest/AccessDelegationMode.java b/standalone-metastore/metastore-rest-catalog/src/main/java/org/apache/iceberg/rest/AccessDelegationMode.java new file mode 100644 index 000000000000..711b3055aa47 --- /dev/null +++ b/standalone-metastore/metastore-rest-catalog/src/main/java/org/apache/iceberg/rest/AccessDelegationMode.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.iceberg.rest; + +/** + * Possible values for the X-Iceberg-Access-Delegation header. + */ +public enum AccessDelegationMode { + VENDED_CREDENTIALS, REMOTE_SIGNING +} diff --git a/standalone-metastore/metastore-rest-catalog/src/main/java/org/apache/iceberg/rest/HMSCatalogAdapter.java b/standalone-metastore/metastore-rest-catalog/src/main/java/org/apache/iceberg/rest/HMSCatalogAdapter.java index 73d23ae5daf0..d430ab1c249a 100644 --- a/standalone-metastore/metastore-rest-catalog/src/main/java/org/apache/iceberg/rest/HMSCatalogAdapter.java +++ b/standalone-metastore/metastore-rest-catalog/src/main/java/org/apache/iceberg/rest/HMSCatalogAdapter.java @@ -21,8 +21,10 @@ import com.google.common.base.Preconditions; import java.util.Arrays; +import java.util.EnumSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.function.Consumer; import org.apache.iceberg.BaseTable; import org.apache.iceberg.BaseTransaction; @@ -72,12 +74,15 @@ import org.apache.iceberg.rest.responses.UpdateNamespacePropertiesResponse; import org.apache.iceberg.util.Pair; import org.apache.iceberg.util.PropertyUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Original @ RESTCatalogAdapter.java * Adaptor class to translate REST requests into {@link Catalog} API calls. */ public class HMSCatalogAdapter implements RESTClient { + private static final Logger LOG = LoggerFactory.getLogger(HMSCatalogAdapter.class); private static final Splitter SLASH = Splitter.on('/'); private static final Map, Integer> EXCEPTION_ERROR_CODES = @@ -102,14 +107,15 @@ public class HMSCatalogAdapter implements RESTClient { private final Catalog catalog; private final SupportsNamespaces asNamespaceCatalog; private final ViewCatalog asViewCatalog; + private final IcebergVendedCredentialProvider credentialProvider; - - public HMSCatalogAdapter(Catalog catalog) { + HMSCatalogAdapter(Catalog catalog, IcebergVendedCredentialProvider credentialProvider) { Preconditions.checkArgument(catalog instanceof SupportsNamespaces); Preconditions.checkArgument(catalog instanceof ViewCatalog); this.catalog = catalog; this.asNamespaceCatalog = (SupportsNamespaces) catalog; this.asViewCatalog = (ViewCatalog) catalog; + this.credentialProvider = credentialProvider; } enum Route { @@ -263,18 +269,21 @@ private ListTablesResponse listTables(Map vars) { return castResponse(ListTablesResponse.class, CatalogHandlers.listTables(catalog, namespace)); } - private LoadTableResponse createTable(Map vars, Object body) { + private LoadTableResponse createTable(Set accessDelegationModes, Map vars, + Object body) { final Class responseType = LoadTableResponse.class; Namespace namespace = namespaceFromPathVars(vars); CreateTableRequest request = castRequest(CreateTableRequest.class, body); request.validate(); + LoadTableResponse response; if (request.stageCreate()) { - return castResponse( - responseType, CatalogHandlers.stageTableCreate(catalog, namespace, request)); + response = castResponse( + responseType, CatalogHandlers.stageTableCreate(catalog, namespace, request)); } else { - return castResponse( - responseType, CatalogHandlers.createTable(catalog, namespace, request)); + response = castResponse( + responseType, CatalogHandlers.createTable(catalog, namespace, request)); } + return withCredentials(accessDelegationModes, TableIdentifier.of(namespace, request.name()), response); } private RESTResponse dropTable(Map vars) { @@ -292,21 +301,33 @@ private RESTResponse tableExists(Map vars) { return null; } - private LoadTableResponse loadTable(Map vars) { + private LoadTableResponse loadTable(Set delegationModes, Map vars) { TableIdentifier ident = identFromPathVars(vars); - return castResponse(LoadTableResponse.class, CatalogHandlers.loadTable(catalog, ident)); + LoadTableResponse response = + castResponse(LoadTableResponse.class, CatalogHandlers.loadTable(catalog, ident)); + return withCredentials(delegationModes, ident, response); } - private LoadTableResponse registerTable(Map vars, Object body) { - Namespace namespace = namespaceFromPathVars(vars); - RegisterTableRequest request = castRequest(RegisterTableRequest.class, body); - return castResponse(LoadTableResponse.class, CatalogHandlers.registerTable(catalog, namespace, request)); + private LoadTableResponse registerTable( + Set delegationModes, + Map vars, + Object body) { + Namespace namespace = namespaceFromPathVars(vars); + RegisterTableRequest request = castRequest(RegisterTableRequest.class, body); + LoadTableResponse response = + castResponse(LoadTableResponse.class, CatalogHandlers.registerTable(catalog, namespace, request)); + return withCredentials(delegationModes, TableIdentifier.of(namespace, request.name()), response); } - private LoadTableResponse updateTable(Map vars, Object body) { + private LoadTableResponse updateTable( + Set delegationModes, + Map vars, + Object body) { TableIdentifier ident = identFromPathVars(vars); UpdateTableRequest request = castRequest(UpdateTableRequest.class, body); - return castResponse(LoadTableResponse.class, CatalogHandlers.updateTable(catalog, ident, request)); + LoadTableResponse response = + castResponse(LoadTableResponse.class, CatalogHandlers.updateTable(catalog, ident, request)); + return withCredentials(delegationModes, ident, response); } private RESTResponse renameTable(Object body) { @@ -377,6 +398,33 @@ private RESTResponse dropView(Map vars) { return null; } + private LoadTableResponse withCredentials( + Set accessDelegationModes, + TableIdentifier ident, + LoadTableResponse response) { + if (credentialProvider == null) { + return response; + } + + if (accessDelegationModes.contains(AccessDelegationMode.VENDED_CREDENTIALS)) { + return withVendedCredentials(ident, response); + } + + if (accessDelegationModes.contains(AccessDelegationMode.REMOTE_SIGNING)) { + LOG.warn("Remote signing is not supported. Ignoring..."); + } + + return response; + } + + private LoadTableResponse withVendedCredentials(TableIdentifier ident, LoadTableResponse response) { + final var credentials = credentialProvider.vend(ident, response.tableMetadata().location()); + return LoadTableResponse.builder() + .withTableMetadata(response.tableMetadata()) + .addAllConfig(response.config()) + .addAllCredentials(credentials).build(); + } + /** * This is a very simplistic approach that only validates the requirements for each table and does * not do any other conflict detection. Therefore, it does not guarantee true transactional @@ -408,7 +456,10 @@ private static void commitTransaction(Catalog catalog, CommitTransactionRequest @SuppressWarnings({"MethodLength", "unchecked"}) private T handleRequest( - Route route, Map vars, Object body) { + Route route, + Set accessDelegationModes, + Map vars, + Object body) { switch (route) { case CONFIG: return (T) config(); @@ -435,7 +486,7 @@ private T handleRequest( return (T) listTables(vars); case CREATE_TABLE: - return (T) createTable(vars, body); + return (T) createTable(accessDelegationModes, vars, body); case DROP_TABLE: return (T) dropTable(vars); @@ -444,13 +495,13 @@ private T handleRequest( return (T) tableExists(vars); case LOAD_TABLE: - return (T) loadTable(vars); + return (T) loadTable(accessDelegationModes, vars); case REGISTER_TABLE: - return (T) registerTable(vars, body); + return (T) registerTable(accessDelegationModes, vars, body); case UPDATE_TABLE: - return (T) updateTable(vars, body); + return (T) updateTable(accessDelegationModes, vars, body); case RENAME_TABLE: return (T) renameTable(body); @@ -491,6 +542,7 @@ private T handleRequest( T execute( HTTPMethod method, String path, + Set accessDelegationModes, Map queryParams, Object body, Consumer errorHandler) { @@ -503,7 +555,7 @@ T execute( vars.putAll(queryParams); } vars.putAll(routeAndVars.second()); - return handleRequest(routeAndVars.first(), vars.build(), body); + return handleRequest(routeAndVars.first(), accessDelegationModes, vars.build(), body); } catch (RuntimeException e) { configureResponseFromException(e, errorBuilder); } @@ -519,6 +571,15 @@ T execute( throw new RESTException("Unhandled error: %s", error); } + T execute( + HTTPMethod method, + String path, + Map queryParams, + Object body, + Consumer errorHandler) { + return execute(method, path, EnumSet.noneOf(AccessDelegationMode.class), queryParams, body, errorHandler); + } + @Override public T delete( String path, diff --git a/standalone-metastore/metastore-rest-catalog/src/main/java/org/apache/iceberg/rest/HMSCatalogFactory.java b/standalone-metastore/metastore-rest-catalog/src/main/java/org/apache/iceberg/rest/HMSCatalogFactory.java index d21f239f3416..9eb029ccd5a7 100644 --- a/standalone-metastore/metastore-rest-catalog/src/main/java/org/apache/iceberg/rest/HMSCatalogFactory.java +++ b/standalone-metastore/metastore-rest-catalog/src/main/java/org/apache/iceberg/rest/HMSCatalogFactory.java @@ -112,7 +112,11 @@ private HttpServlet createServlet(Catalog catalog) { // Iceberg REST client uses "catalog" by default List scopes = Collections.singletonList("catalog"); ServletSecurity security = new ServletSecurity(AuthType.fromString(authType), configuration, req -> scopes); - return security.proxy(new HMSCatalogServlet(new HMSCatalogAdapter(catalog))); + IcebergVendedCredentialProvider vendedCredentialProvider = null; + if (MetastoreConf.getBoolVar(configuration, ConfVars.ICEBERG_CATALOG_VENDED_CREDENTIALS_ENABLED)) { + vendedCredentialProvider = new IcebergVendedCredentialProvider(configuration); + } + return security.proxy(new HMSCatalogServlet(new HMSCatalogAdapter(catalog, vendedCredentialProvider))); } /** diff --git a/standalone-metastore/metastore-rest-catalog/src/main/java/org/apache/iceberg/rest/HMSCatalogServlet.java b/standalone-metastore/metastore-rest-catalog/src/main/java/org/apache/iceberg/rest/HMSCatalogServlet.java index 6140f40b2de5..b3601aef7553 100644 --- a/standalone-metastore/metastore-rest-catalog/src/main/java/org/apache/iceberg/rest/HMSCatalogServlet.java +++ b/standalone-metastore/metastore-rest-catalog/src/main/java/org/apache/iceberg/rest/HMSCatalogServlet.java @@ -21,8 +21,11 @@ import java.io.IOException; import java.io.UncheckedIOException; +import java.util.Arrays; import java.util.Map; +import java.util.Objects; import java.util.Optional; +import java.util.Set; import java.util.function.Consumer; import java.util.stream.Collectors; import javax.servlet.http.HttpServlet; @@ -45,7 +48,7 @@ public class HMSCatalogServlet extends HttpServlet { private static final Logger LOG = LoggerFactory.getLogger(HMSCatalogServlet.class); private static final String CONTENT_TYPE = "Content-Type"; private static final String APPLICATION_JSON = "application/json"; - + private final HMSCatalogAdapter restCatalogAdapter; private final Map responseHeaders = ImmutableMap.of(CONTENT_TYPE, APPLICATION_JSON); @@ -75,6 +78,7 @@ protected void service(HttpServletRequest request, HttpServletResponse response) restCatalogAdapter.execute( context.method(), context.path(), + context.accessDelegationModes(), context.queryParams(), context.body(), handle(response)); @@ -103,6 +107,7 @@ private Consumer handle(HttpServletResponse response) { public static class ServletRequestContext { private HTTPMethod method; private String path; + private Set accessDelegationModes; private Map queryParams; private Object body; @@ -115,10 +120,12 @@ private ServletRequestContext(ErrorResponse errorResponse) { private ServletRequestContext( HTTPMethod method, String path, + Set accessDelegationModes, Map queryParams, Object body) { this.method = method; this.path = path; + this.accessDelegationModes = accessDelegationModes; this.queryParams = queryParams; this.body = body; } @@ -144,6 +151,21 @@ static ServletRequestContext from(HttpServletRequest request) throws IOException .build()); } + var accessDelegationModes = Arrays + .stream(Optional.ofNullable(request.getHeader("X-Iceberg-Access-Delegation")).orElse("").split(",")) + .map(String::trim) + .filter(header -> !header.isEmpty()) + .map(header -> switch (header) { + case "vended-credentials" -> AccessDelegationMode.VENDED_CREDENTIALS; + case "remote-signing" -> AccessDelegationMode.REMOTE_SIGNING; + default -> { + LOG.warn("Unknown access delegation mode: {}", header); + yield null; + } + }) + .filter(Objects::nonNull) + .collect(Collectors.toUnmodifiableSet()); + Route route = routeContext.first(); Object requestBody = null; if (route.requestClass() != null) { @@ -155,7 +177,7 @@ static ServletRequestContext from(HttpServletRequest request) throws IOException request.getParameterMap().entrySet().stream() .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue()[0])); - return new ServletRequestContext(method, path, queryParams, requestBody); + return new ServletRequestContext(method, path, accessDelegationModes, queryParams, requestBody); } HTTPMethod method() { @@ -166,6 +188,10 @@ public String path() { return path; } + public Set accessDelegationModes() { + return accessDelegationModes; + } + public Map queryParams() { return queryParams; } diff --git a/standalone-metastore/metastore-rest-catalog/src/main/java/org/apache/iceberg/rest/IcebergVendedCredentialProvider.java b/standalone-metastore/metastore-rest-catalog/src/main/java/org/apache/iceberg/rest/IcebergVendedCredentialProvider.java new file mode 100644 index 000000000000..3609ed29705c --- /dev/null +++ b/standalone-metastore/metastore-rest-catalog/src/main/java/org/apache/iceberg/rest/IcebergVendedCredentialProvider.java @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.iceberg.rest; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.credential.CompositeVendedCredentialProvider; +import org.apache.hadoop.hive.metastore.credential.StorageAccessRequest; +import org.apache.hadoop.hive.metastore.credential.StorageOperation; +import org.apache.hadoop.hive.metastore.credential.VendedCredentialProvider; +import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.metadata.HiveUtils; +import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveAccessControlException; +import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveAuthorizer; +import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveAuthzContext; +import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveAuthzPluginException; +import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveAuthzSessionContext; +import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveMetastoreClientFactoryImpl; +import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveOperationType; +import org.apache.hadoop.hive.ql.security.authorization.plugin.HivePrivilegeObject; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.rest.credentials.Credential; +import org.apache.iceberg.rest.credentials.ImmutableCredential; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.Collections; +import java.util.EnumSet; +import java.util.List; +import java.util.Set; +import java.util.function.Supplier; + +/** + * This class provides vended credentials for Iceberg. + */ +public class IcebergVendedCredentialProvider { + private final String catalog; + private final VendedCredentialProvider vendedCredentialProvider; + private final Supplier authorizerSupplier; + + public IcebergVendedCredentialProvider(Configuration conf) { + this.catalog = MetaStoreUtils.getDefaultCatalog(conf); + this.vendedCredentialProvider = new CompositeVendedCredentialProvider(conf); + this.authorizerSupplier = () -> { + try { + final var hiveConf = HiveConf.cloneConf(conf); + final var authorizerFactory = HiveUtils.getAuthorizerFactory(hiveConf, + HiveConf.ConfVars.HIVE_AUTHORIZATION_MANAGER); + if (authorizerFactory == null) { + throw new IllegalStateException("Iceberg's Credential Vending requires Hive's Authorization Manager"); + } + + final var authenticator = HiveUtils.getAuthenticator(hiveConf, + HiveConf.ConfVars.HIVE_METASTORE_AUTHENTICATOR_MANAGER); + authenticator.setConf(hiveConf); + + final var authzContextBuilder = new HiveAuthzSessionContext.Builder(); + authzContextBuilder.setClientType(HiveAuthzSessionContext.CLIENT_TYPE.HIVEMETASTORE); + authzContextBuilder.setSessionString("IcebergRESTCatalog"); + return authorizerFactory.createHiveAuthorizer( + new HiveMetastoreClientFactoryImpl(hiveConf), hiveConf, authenticator, authzContextBuilder.build()); + } catch (HiveException e) { + throw new IllegalStateException("Failed to initialize Hive authorizer for Iceberg credential vending", e); + } + }; + } + + @VisibleForTesting + IcebergVendedCredentialProvider(String catalog, VendedCredentialProvider vendedCredentialProvider, + Supplier authorizerSupplier) { + this.catalog = catalog; + this.vendedCredentialProvider = vendedCredentialProvider; + this.authorizerSupplier = authorizerSupplier; + } + + /** + * Vends credentials for the given table identifier. + * + * @param identifier the table identifier + * @param location the table location + * @return the vended credentials + */ + public List vend(TableIdentifier identifier, String location) { + final String username; + try { + username = UserGroupInformation.getCurrentUser().getShortUserName(); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + final var allowedOperations = resolveAllowedOperations(identifier); + if (allowedOperations.isEmpty()) { + return Collections.emptyList(); + } + final var request = new StorageAccessRequest(new Path(location), allowedOperations); + return vendedCredentialProvider.vend(username, Collections.singletonList(request)).stream() + .map(credential -> ImmutableCredential.builder() + .prefix(credential.prefix().toString()).config(credential.credentials()).build()) + .map(x -> (Credential) x) + .toList(); + } + + private Set resolveAllowedOperations(TableIdentifier identifier) { + Preconditions.checkArgument(identifier.namespace().levels().length == 1); + final var database = identifier.namespace().level(0); + final var table = identifier.name(); + + final var authorizer = authorizerSupplier.get(); + final var allowedOperations = EnumSet.noneOf(StorageOperation.class); + if (isReadable(authorizer, database, table)) { + allowedOperations.add(StorageOperation.LIST); + allowedOperations.add(StorageOperation.READ); + } + if (isWritable(authorizer, database, table)) { + allowedOperations.add(StorageOperation.CREATE); + allowedOperations.add(StorageOperation.DELETE); + } + + return allowedOperations; + } + + // Check if the user has the SELECT permission + private boolean isReadable(HiveAuthorizer authorizer, String database, String table) { + final var object = new HivePrivilegeObject( + HivePrivilegeObject.HivePrivilegeObjectType.TABLE_OR_VIEW, + catalog, + database, + table + ); + return isAllowed(authorizer, Collections.singletonList(object), Collections.emptyList()); + } + + // Check if the user has the INSERT INTO permission + private boolean isWritable(HiveAuthorizer authorizer, String database, String table) { + final var object = new HivePrivilegeObject( + HivePrivilegeObject.HivePrivilegeObjectType.TABLE_OR_VIEW, + catalog, + database, + table, + null, + null, + HivePrivilegeObject.HivePrivObjectActionType.INSERT, + null); + return isAllowed(authorizer, Collections.emptyList(), Collections.singletonList(object)); + } + + private boolean isAllowed(HiveAuthorizer authorizer, List input, + List output) { + var context = new HiveAuthzContext.Builder().build(); + try { + authorizer.checkPrivileges(HiveOperationType.QUERY, input, output, context); + return true; + } catch (HiveAccessControlException e) { + return false; + } catch (HiveAuthzPluginException e) { + throw new IllegalStateException("Failed to check privileges for Iceberg credential vending", e); + } + } +} diff --git a/standalone-metastore/metastore-rest-catalog/src/test/java/org/apache/iceberg/rest/TestCredentialVendingAws.java b/standalone-metastore/metastore-rest-catalog/src/test/java/org/apache/iceberg/rest/TestCredentialVendingAws.java new file mode 100644 index 000000000000..206ecd6c30c1 --- /dev/null +++ b/standalone-metastore/metastore-rest-catalog/src/test/java/org/apache/iceberg/rest/TestCredentialVendingAws.java @@ -0,0 +1,239 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.iceberg.rest; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.Collections; +import java.util.HashMap; +import java.util.UUID; + +import org.apache.hadoop.fs.s3a.S3AFileSystem; +import org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider; +import org.apache.hadoop.fs.s3a.TemporaryAWSCredentialsProvider; +import org.apache.hadoop.hive.metastore.ServletSecurity.AuthType; +import org.apache.hadoop.hive.metastore.annotation.MetastoreExternalTest; +import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars; +import org.apache.hadoop.hive.metastore.credential.s3.S3VendedCredentialProvider; +import org.apache.hadoop.hive.metastore.testutils.AwsS3IntegrationTestConfig; +import org.apache.iceberg.BaseTable; +import org.apache.iceberg.Schema; +import org.apache.iceberg.aws.s3.S3FileIO; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.rest.extension.HiveRESTCatalogServerExtension; +import org.apache.iceberg.rest.extension.MockHiveAuthorizer; +import org.apache.iceberg.types.Types; +import org.junit.experimental.categories.Category; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Assumptions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInstance; +import org.junit.jupiter.api.extension.RegisterExtension; +import software.amazon.awssdk.services.s3.S3Client; +import software.amazon.awssdk.services.s3.model.AccessDeniedException; +import software.amazon.awssdk.services.s3.model.DeleteObjectRequest; +import software.amazon.awssdk.services.s3.model.ListObjectsV2Request; + +@Category(MetastoreExternalTest.class) +@TestInstance(TestInstance.Lifecycle.PER_CLASS) +class TestCredentialVendingAws { + private static final String ACCESS_DELEGATION_HEADER = "header.X-Iceberg-Access-Delegation"; + private static final String AWS_ACCESS_KEY_ID = "AWS_ACCESS_KEY_ID"; + private static final String AWS_SECRET_ACCESS_KEY = "AWS_SECRET_ACCESS_KEY"; + private static final String AWS_SESSION_TOKEN = "AWS_SESSION_TOKEN"; + + private static final Namespace NAMESPACE = Namespace.of("ns"); + private static final TableIdentifier TABLE = TableIdentifier.of(NAMESPACE, "test"); + private static final Schema SCHEMA = new Schema(Types.NestedField.required(1, "id", Types.LongType.get())); + + @RegisterExtension + private static final HiveRESTCatalogServerExtension REST_CATALOG_EXTENSION = newServerExtension(); + + private AwsS3IntegrationTestConfig config; + private RESTCatalog adminCatalog; + private S3Client adminS3; + private String currentTableRoot; + private String tableLocation; + private String metadataLocation; + + private static HiveRESTCatalogServerExtension newServerExtension() { + var builder = HiveRESTCatalogServerExtension.builder(AuthType.SIMPLE); + if (!AwsS3IntegrationTestConfig.isConfigured()) { + return builder.build(); + } + + var config = AwsS3IntegrationTestConfig.fromEnvironment(); + + builder.configure(ConfVars.ICEBERG_CATALOG_VENDED_CREDENTIALS_ENABLED.getVarname(), "true"); + builder.configure(ConfVars.CATALOG_VENDED_CREDENTIALS_PROVIDERS.getVarname(), "my-s3"); + builder.configure(ConfVars.CATALOG_VENDED_CREDENTIALS_PROVIDERS.getVarname() + ".my-s3.class", + S3VendedCredentialProvider.class.getName()); + builder.configure(ConfVars.CATALOG_VENDED_CREDENTIALS_PROVIDERS.getVarname() + ".my-s3.aws.role-arn", + config.roleArn()); + builder.configure(ConfVars.CATALOG_VENDED_CREDENTIALS_PROVIDERS.getVarname() + ".my-s3.aws.prefixes", + "%s/%s/".formatted(config.bucket(), config.basePath())); + builder.configure(ConfVars.CATALOG_VENDED_CREDENTIALS_PROVIDERS.getVarname() + ".my-s3.aws.region", + config.regionId()); + if (config.externalId() != null && !config.externalId().isBlank()) { + builder.configure(ConfVars.CATALOG_VENDED_CREDENTIALS_PROVIDERS.getVarname() + ".my-s3.aws.external-id", + config.externalId()); + } + builder.configure("fs.s3a.impl", S3AFileSystem.class.getName()); + builder.configure("fs.AbstractFileSystem.s3a.impl", "org.apache.hadoop.fs.s3a.S3A"); + builder.configure("fs.s3a.endpoint.region", config.regionId()); + configureS3aCredentials(builder); + + return builder.build(); + } + + private static void configureS3aCredentials(HiveRESTCatalogServerExtension.Builder builder) { + var accessKey = System.getenv(AWS_ACCESS_KEY_ID); + var secretKey = System.getenv(AWS_SECRET_ACCESS_KEY); + var sessionToken = System.getenv(AWS_SESSION_TOKEN); + + if (accessKey == null || accessKey.isBlank() || secretKey == null || secretKey.isBlank()) { + return; + } + + builder.configure("fs.s3a.access.key", accessKey); + builder.configure("fs.s3a.secret.key", secretKey); + if (sessionToken != null && !sessionToken.isBlank()) { + builder.configure("fs.s3a.session.token", sessionToken); + builder.configure("fs.s3a.aws.credentials.provider", TemporaryAWSCredentialsProvider.class.getName()); + } else { + builder.configure("fs.s3a.aws.credentials.provider", SimpleAWSCredentialsProvider.class.getName()); + } + } + + private RESTCatalog newCatalog(String user, boolean requestVendedCredentials) { + var properties = new HashMap(); + properties.put("uri", REST_CATALOG_EXTENSION.getRestEndpoint()); + properties.put("header.x-actor-username", user); + properties.put("io-impl", S3FileIO.class.getName()); + properties.put("client.region", config.regionId()); + if (requestVendedCredentials) { + properties.put(ACCESS_DELEGATION_HEADER, "vended-credentials"); + } + return RCKUtils.initCatalogClient(properties); + } + + private void deletePrefix(String prefix) { + String continuationToken = null; + boolean truncated; + do { + var response = adminS3.listObjectsV2(ListObjectsV2Request.builder() + .bucket(config.bucket()) + .prefix(prefix) + .continuationToken(continuationToken) + .build()); + response.contents().forEach(object -> adminS3.deleteObject( + DeleteObjectRequest.builder().bucket(config.bucket()).key(object.key()).build())); + continuationToken = response.nextContinuationToken(); + truncated = Boolean.TRUE.equals(response.isTruncated()); + } while (truncated); + } + + @BeforeAll + void setupAll() { + Assumptions.assumeTrue( + AwsS3IntegrationTestConfig.isConfigured(), + "Set HIVE_IT_AWS_INTEGRATION_TEST_ENABLED=true and configure S3 integration environment variables"); + + config = AwsS3IntegrationTestConfig.fromEnvironment(); + adminS3 = S3Client.builder().region(config.region()).build(); + adminCatalog = newCatalog("admin", false); + + Assertions.assertEquals( + Collections.singletonList(Namespace.of("default")), + adminCatalog.listNamespaces()); + } + + @BeforeEach + void setup() { + RCKUtils.purgeCatalogTestEntries(adminCatalog); + adminCatalog.createNamespace(NAMESPACE); + currentTableRoot = "%s/%s".formatted(config.basePath(), UUID.randomUUID()); + tableLocation = "s3a://%s/%s/table".formatted(config.bucket(), currentTableRoot); + var table = adminCatalog.buildTable(TABLE, SCHEMA).withLocation(tableLocation).create(); + metadataLocation = ((BaseTable) table).operations().refresh().metadataFileLocation(); + } + + @AfterEach + void teardown() { + if (adminCatalog != null) { + RCKUtils.purgeCatalogTestEntries(adminCatalog); + } + if (adminS3 != null && currentTableRoot != null) { + deletePrefix(currentTableRoot); + } + currentTableRoot = null; + tableLocation = null; + metadataLocation = null; + } + + @AfterAll + void teardownAll() throws Exception { + if (adminCatalog != null) { + adminCatalog.close(); + } + if (adminS3 != null) { + adminS3.close(); + } + } + + @Test + void testWritableUser() throws IOException { + try (var sessionCatalog = newCatalog("USER_1", true)) { + var table = sessionCatalog.loadTable(TABLE); + + var metadataFile = table.io().newInputFile(metadataLocation); + Assertions.assertTrue(metadataFile.exists()); + try (var input = metadataFile.newStream()) { + Assertions.assertTrue(new String(input.readAllBytes(), StandardCharsets.UTF_8).contains(tableLocation)); + } + + var destination = tableLocation + "/credential-vending-it.txt"; + try (var output = table.io().newOutputFile(destination).createOrOverwrite()) { + output.write("content".getBytes(StandardCharsets.UTF_8)); + } + } + } + + @Test + void testReadOnlyUser() throws IOException { + try (var sessionCatalog = newCatalog(MockHiveAuthorizer.READ_ONLY_USER, true)) { + var table = sessionCatalog.loadTable(TABLE); + + var metadataFile = table.io().newInputFile(metadataLocation); + Assertions.assertTrue(metadataFile.exists()); + try (var input = metadataFile.newStream()) { + Assertions.assertTrue(new String(input.readAllBytes(), StandardCharsets.UTF_8).contains(tableLocation)); + } + + var destination = tableLocation + "/credential-vending-it.txt"; + var output = table.io().newOutputFile(destination).createOrOverwrite(); + Assertions.assertThrows(AccessDeniedException.class, output::close); + } + } +} diff --git a/standalone-metastore/metastore-rest-catalog/src/test/java/org/apache/iceberg/rest/TestIcebergVendedCredentialProvider.java b/standalone-metastore/metastore-rest-catalog/src/test/java/org/apache/iceberg/rest/TestIcebergVendedCredentialProvider.java new file mode 100644 index 000000000000..bd7b91c23239 --- /dev/null +++ b/standalone-metastore/metastore-rest-catalog/src/test/java/org/apache/iceberg/rest/TestIcebergVendedCredentialProvider.java @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.iceberg.rest; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.metastore.annotation.MetastoreUnitTest; +import org.apache.hadoop.hive.metastore.credential.StorageAccessRequest; +import org.apache.hadoop.hive.metastore.credential.StorageOperation; +import org.apache.hadoop.hive.metastore.credential.VendedCredentialProvider; +import org.apache.hadoop.hive.metastore.credential.VendedStorageCredential; +import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveAccessControlException; +import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveAuthorizer; +import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveAuthzContext; +import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveAuthzPluginException; +import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveOperationType; +import org.apache.hadoop.hive.ql.security.authorization.plugin.HivePrivilegeObject; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.rest.credentials.Credential; +import org.apache.iceberg.rest.credentials.ImmutableCredential; +import org.junit.Assert; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.mockito.ArgumentCaptor; +import org.mockito.Mockito; + +import java.security.PrivilegedAction; +import java.time.Instant; +import java.util.ArrayList; +import java.util.EnumSet; +import java.util.List; +import java.util.Map; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; + +@Category(MetastoreUnitTest.class) +public class TestIcebergVendedCredentialProvider { + private static final String CATALOG = "catalog"; + private static final String DATABASE = "database"; + private static final String TABLE = "tbl"; + private static final HivePrivilegeObject INPUT_OBJECT = new HivePrivilegeObject( + HivePrivilegeObject.HivePrivilegeObjectType.TABLE_OR_VIEW, CATALOG, DATABASE, TABLE + ); + private static final HivePrivilegeObject OUTPUT_OBJECT = new HivePrivilegeObject( + HivePrivilegeObject.HivePrivilegeObjectType.TABLE_OR_VIEW, CATALOG, DATABASE, TABLE, + null, + null, + HivePrivilegeObject.HivePrivObjectActionType.INSERT, + null + ); + + private static void assertPrivilegeObjects(List expected, List actual) { + Assert.assertEquals(expected.size(), actual.size()); + var actualObjects = new ArrayList(actual.size()); + actual.forEach(object -> actualObjects.add((HivePrivilegeObject) object)); + for (int index = 0; index < expected.size(); index++) { + assertPrivilegeObject(expected.get(index), actualObjects.get(index)); + } + } + + private static void assertPrivilegeObject(HivePrivilegeObject expected, HivePrivilegeObject actual) { + Assert.assertEquals(expected.getType(), actual.getType()); + Assert.assertEquals(expected.getCatName(), actual.getCatName()); + Assert.assertEquals(expected.getDbname(), actual.getDbname()); + Assert.assertEquals(expected.getObjectName(), actual.getObjectName()); + Assert.assertEquals(expected.getActionType(), actual.getActionType()); + } + + @Test + @SuppressWarnings("unchecked") + public void testVendWithWritableUser() throws HiveAccessControlException, HiveAuthzPluginException { + var authorizer = Mockito.mock(HiveAuthorizer.class); + var delegate = Mockito.mock(VendedCredentialProvider.class); + var username = "writable"; + var operations = EnumSet.of(StorageOperation.LIST, StorageOperation.READ, StorageOperation.CREATE, + StorageOperation.DELETE); + var path = new Path("s3a://bucket/path"); + var requests = List.of(new StorageAccessRequest(path, operations)); + var credential = List.of(new VendedStorageCredential(path, Map.of("key", "k1"), Instant.MAX)); + Mockito.when(delegate.vend(username, requests)).thenReturn(credential); + var provider = new IcebergVendedCredentialProvider(CATALOG, delegate, () -> authorizer); + var result = UserGroupInformation.createRemoteUser(username).doAs((PrivilegedAction>) () -> + provider.vend(TableIdentifier.of(DATABASE, TABLE), path.toString())); + var expected = ImmutableCredential.builder().prefix(path.toString()).config(Map.of("key", "k1")).build(); + Assert.assertEquals(List.of(expected), result); + + var inputCaptor = ArgumentCaptor.forClass(List.class); + var outputCaptor = ArgumentCaptor.forClass(List.class); + Mockito.verify(authorizer, Mockito.times(2)).checkPrivileges( + eq(HiveOperationType.QUERY), + inputCaptor.capture(), + outputCaptor.capture(), + any(HiveAuthzContext.class) + ); + assertPrivilegeObjects(List.of(INPUT_OBJECT), inputCaptor.getAllValues().getFirst()); + assertPrivilegeObjects(List.of(), inputCaptor.getAllValues().getLast()); + assertPrivilegeObjects(List.of(), outputCaptor.getAllValues().getFirst()); + assertPrivilegeObjects(List.of(OUTPUT_OBJECT), outputCaptor.getAllValues().getLast()); + Mockito.verifyNoMoreInteractions(authorizer); + Mockito.verify(delegate).vend(username, requests); + Mockito.verifyNoMoreInteractions(delegate); + } + + @Test + @SuppressWarnings("unchecked") + public void testVendWithReadOnlyUser() throws HiveAccessControlException, HiveAuthzPluginException { + var authorizer = Mockito.mock(HiveAuthorizer.class); + var delegate = Mockito.mock(VendedCredentialProvider.class); + var username = "readonly"; + var operations = EnumSet.of(StorageOperation.LIST, StorageOperation.READ); + var path = new Path("s3a://bucket/path"); + var requests = List.of(new StorageAccessRequest(path, operations)); + var credential = List.of(new VendedStorageCredential(path, Map.of("key", "k1"), Instant.MAX)); + Mockito.when(delegate.vend(username, requests)).thenReturn(credential); + Mockito.doAnswer(invocation -> { + if (!((List) invocation.getArgument(2)).isEmpty()) { + throw new HiveAccessControlException("write denied"); + } + return null; + }).when(authorizer).checkPrivileges(any(), any(), any(), any()); + + var provider = new IcebergVendedCredentialProvider(CATALOG, delegate, () -> authorizer); + var result = UserGroupInformation.createRemoteUser(username).doAs((PrivilegedAction>) () -> + provider.vend(TableIdentifier.of(DATABASE, TABLE), path.toString())); + var expected = ImmutableCredential.builder().prefix(path.toString()).config(Map.of("key", "k1")).build(); + Assert.assertEquals(List.of(expected), result); + + var operationCaptor = ArgumentCaptor.forClass(HiveOperationType.class); + var inputCaptor = ArgumentCaptor.forClass(List.class); + var outputCaptor = ArgumentCaptor.forClass(List.class); + Mockito.verify(authorizer, Mockito.times(2)).checkPrivileges( + operationCaptor.capture(), + inputCaptor.capture(), + outputCaptor.capture(), + any(HiveAuthzContext.class) + ); + Assert.assertEquals(List.of(HiveOperationType.QUERY, HiveOperationType.QUERY), operationCaptor.getAllValues()); + assertPrivilegeObjects(List.of(INPUT_OBJECT), inputCaptor.getAllValues().getFirst()); + assertPrivilegeObjects(List.of(), inputCaptor.getAllValues().getLast()); + assertPrivilegeObjects(List.of(), outputCaptor.getAllValues().getFirst()); + assertPrivilegeObjects(List.of(OUTPUT_OBJECT), outputCaptor.getAllValues().getLast()); + Mockito.verifyNoMoreInteractions(authorizer); + Mockito.verify(delegate).vend(username, requests); + Mockito.verifyNoMoreInteractions(delegate); + } + + @Test + public void testVendWithoutPrivileges() throws HiveAccessControlException, HiveAuthzPluginException { + var authorizer = Mockito.mock(HiveAuthorizer.class); + var delegate = Mockito.mock(VendedCredentialProvider.class); + var username = "denied"; + Mockito.doThrow(new HiveAccessControlException("denied")) + .when(authorizer).checkPrivileges(any(), any(), any(), any()); + + var provider = new IcebergVendedCredentialProvider(CATALOG, delegate, () -> authorizer); + var result = UserGroupInformation.createRemoteUser(username).doAs((PrivilegedAction>) () -> + provider.vend(TableIdentifier.of(DATABASE, TABLE), "s3a://bucket/path")); + + Assert.assertEquals(List.of(), result); + Mockito.verify(authorizer, Mockito.times(2)).checkPrivileges(any(), any(), any(), any()); + Mockito.verifyNoMoreInteractions(authorizer); + Mockito.verifyNoInteractions(delegate); + } +} diff --git a/standalone-metastore/metastore-rest-catalog/src/test/java/org/apache/iceberg/rest/extension/MockHiveAuthorizer.java b/standalone-metastore/metastore-rest-catalog/src/test/java/org/apache/iceberg/rest/extension/MockHiveAuthorizer.java index 4dd2600d3a6f..49b1404f5b0d 100644 --- a/standalone-metastore/metastore-rest-catalog/src/test/java/org/apache/iceberg/rest/extension/MockHiveAuthorizer.java +++ b/standalone-metastore/metastore-rest-catalog/src/test/java/org/apache/iceberg/rest/extension/MockHiveAuthorizer.java @@ -18,7 +18,9 @@ package org.apache.iceberg.rest.extension; +import java.util.ArrayList; import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.security.HiveAuthenticationProvider; import org.apache.hadoop.hive.ql.security.authorization.plugin.AbstractHiveAuthorizer; @@ -35,7 +37,9 @@ public class MockHiveAuthorizer extends AbstractHiveAuthorizer { public static final String PERMISSION_TEST_USER = "permission_test_user"; + public static final String READ_ONLY_USER = "read_only_user"; private static final Logger LOG = LoggerFactory.getLogger(MockHiveAuthorizer.class); + private static final List PRIVILEGE_CHECKS = new CopyOnWriteArrayList<>(); private final HiveAuthenticationProvider authenticator; @@ -43,6 +47,22 @@ public MockHiveAuthorizer(HiveAuthenticationProvider authenticator) { this.authenticator = authenticator; } + public record PrivilegeCheck(HiveOperationType operationType, List inputs, + List outputs) { + } + + public static void clearPrivilegeChecks() { + PRIVILEGE_CHECKS.clear(); + } + + public static List privilegeChecks() { + return new ArrayList<>(PRIVILEGE_CHECKS); + } + + private static List copyPrivilegeObjects(List objects) { + return objects == null ? List.of() : List.copyOf(objects); + } + @Override public VERSION getVersion() { return null; @@ -97,9 +117,15 @@ public void checkPrivileges(HiveOperationType hiveOpType, List outputHObjs, HiveAuthzContext context) throws HiveAccessControlException { LOG.info("Checking privileges. User={}, Operation={}, inputs={}, outputs={}", authenticator.getUserName(), hiveOpType, inputsHObjs, outputHObjs); + PRIVILEGE_CHECKS.add(new PrivilegeCheck(hiveOpType, copyPrivilegeObjects(inputsHObjs), + copyPrivilegeObjects(outputHObjs))); if (PERMISSION_TEST_USER.equals(authenticator.getUserName())) { - throw new HiveAccessControlException(String.format("Unauthorized. Operation=%s, inputs=%s, outputs=%s", - hiveOpType, inputsHObjs, outputHObjs)); + throw new HiveAccessControlException(String.format("Unauthorized. User=%s, Operation=%s, inputs=%s, outputs=%s", + authenticator.getUserName(), hiveOpType, inputsHObjs, outputHObjs)); + } + if (READ_ONLY_USER.equals(authenticator.getUserName()) && !outputHObjs.isEmpty()) { + throw new HiveAccessControlException(String.format("Unauthorized. User=%s, Operation=%s, inputs=%s, outputs=%s", + authenticator.getUserName(), hiveOpType, inputsHObjs, outputHObjs)); } } diff --git a/standalone-metastore/metastore-server/pom.xml b/standalone-metastore/metastore-server/pom.xml index 64926eaba1c9..4daed1576556 100644 --- a/standalone-metastore/metastore-server/pom.xml +++ b/standalone-metastore/metastore-server/pom.xml @@ -379,7 +379,32 @@ org.springframework spring-core + + software.amazon.awssdk + arns + provided + + + software.amazon.awssdk + iam-policy-builder + provided + + + software.amazon.awssdk + sts + provided + + + software.amazon.awssdk + url-connection-client + provided + + + software.amazon.awssdk + bundle + test + junit junit diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/credential/CachedVendedCredentialProvider.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/credential/CachedVendedCredentialProvider.java new file mode 100644 index 000000000000..c04a380e9e85 --- /dev/null +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/credential/CachedVendedCredentialProvider.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.metastore.credential; + +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; +import com.github.benmanes.caffeine.cache.Expiry; +import org.checkerframework.checker.index.qual.NonNegative; +import org.jetbrains.annotations.NotNull; + +import java.time.Clock; +import java.time.Duration; +import java.util.List; + +/** + * A VendedCredentialProvider that caches the results of the delegated provider. + */ +public class CachedVendedCredentialProvider implements VendedCredentialProvider { + private record CacheKey(String username, List accessRequests) {} + + private final VendedCredentialProvider delegate; + private final Cache> cache; + + public CachedVendedCredentialProvider(VendedCredentialProvider delegate, long maxSize, Duration maxCacheDuration, + Clock clock) { + this.delegate = delegate; + this.cache = Caffeine.newBuilder().maximumSize(maxSize).expireAfter( + new Expiry>() { + private long calculateExpiration(List credentials) { + var now = clock.instant(); + // Choose the minimal one / 2 in case there is clock-skew + var expiredIn = credentials.stream().map(VendedStorageCredential::expiredAt) + .map(expiredAt -> Duration.between(now, expiredAt).dividedBy(2)).min(Duration::compareTo); + return expiredIn.map(duration -> Math.min(duration.toNanos(), maxCacheDuration.toNanos())) + .orElseGet(maxCacheDuration::toNanos); + } + + @Override + public long expireAfterCreate(@NotNull CachedVendedCredentialProvider.CacheKey key, + @NotNull List value, long currentTime) { + return calculateExpiration(value); + } + + @Override + public long expireAfterUpdate(@NotNull CachedVendedCredentialProvider.CacheKey key, + @NotNull List value, long currentTime, @NonNegative long currentDuration) { + return calculateExpiration(value); + } + + @Override + public long expireAfterRead(@NotNull CachedVendedCredentialProvider.CacheKey key, + @NotNull List value, long currentTime, @NonNegative long currentDuration) { + return currentDuration; + } + }).build(); + } + + @Override + public boolean supports(StorageAccessRequest request) { + return delegate.supports(request); + } + + @Override + public List vend(String username, List accessRequests) { + return cache.get(new CacheKey(username, accessRequests), k -> delegate.vend(username, accessRequests)); + } +} diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/credential/CompositeVendedCredentialProvider.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/credential/CompositeVendedCredentialProvider.java new file mode 100644 index 000000000000..91285f1df457 --- /dev/null +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/credential/CompositeVendedCredentialProvider.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.metastore.credential; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.metastore.conf.MetastoreConf; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.lang.reflect.InvocationTargetException; +import java.time.Clock; +import java.time.Duration; +import java.util.Arrays; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +public class CompositeVendedCredentialProvider implements VendedCredentialProvider { + private static final class FallbackVendedCredentialProvider implements VendedCredentialProvider { + @Override + public boolean supports(StorageAccessRequest request) { + return true; + } + + @Override + public List vend(String username, List accessRequests) { + return List.of(); + } + } + + private static final Logger LOG = LoggerFactory.getLogger(CompositeVendedCredentialProvider.class); + private static final String PROVIDERS_KEY_PREFIX = + MetastoreConf.ConfVars.CATALOG_VENDED_CREDENTIALS_PROVIDERS.getVarname(); + private static final String CLASS_KEY = "class"; + private static final String CACHE_MAX_SIZE_KEY = "cache.max-size"; + private static final String CACHE_MAX_DURATION_KEY = "cache.max-duration"; + private static final Duration DEFAULT_MAX_CACHE_DURATION = Duration.ofMinutes(30); + private static final VendedCredentialProvider FALLBACK_PROVIDER = new FallbackVendedCredentialProvider(); + + private final List providers; + + private static VendedCredentialProvider create(Configuration conf, String providerId) { + final var providerConfigKeyPrefix = "%s.%s.".formatted(PROVIDERS_KEY_PREFIX, providerId); + final var classKey = providerConfigKeyPrefix + CLASS_KEY; + final var clazz = conf.getClass(classKey, null, VendedCredentialProvider.class); + if (clazz == null) { + throw new IllegalArgumentException( + "No vended credential provider class configured for provider ID: " + providerId); + } + + final VendedCredentialProvider provider; + try { + final var constructor = clazz.getDeclaredConstructor(String.class, Configuration.class); + provider = constructor.newInstance(providerConfigKeyPrefix, conf); + } catch (NoSuchMethodException | InstantiationException | IllegalAccessException | InvocationTargetException e) { + throw new IllegalArgumentException("Failed to instantiate vended credential provider: " + clazz.getName(), e); + } + + final var maxCacheSize = conf.getInt(providerConfigKeyPrefix + CACHE_MAX_SIZE_KEY, 0); + if (maxCacheSize <= 0) { + LOG.info("Created VendedCredentialProvider, {}, without cache", provider); + return provider; + } + + final var maxCacheDuration = Duration.ofNanos( + conf.getTimeDuration(providerConfigKeyPrefix + CACHE_MAX_DURATION_KEY, + DEFAULT_MAX_CACHE_DURATION.toNanos(), TimeUnit.NANOSECONDS)); + LOG.info("Created VendedCredentialProvider, {}, with caching (capacity={}, duration={}) ", provider, maxCacheSize, + maxCacheDuration); + return new CachedVendedCredentialProvider(provider, maxCacheSize, maxCacheDuration, Clock.systemUTC()); + } + + public CompositeVendedCredentialProvider(Configuration conf) { + this( + Arrays + .stream( + MetastoreConf.getTrimmedStringsVar(conf, MetastoreConf.ConfVars.CATALOG_VENDED_CREDENTIALS_PROVIDERS)) + .filter(providerId -> !providerId.isEmpty()) + .map(providerId -> create(conf, providerId)) + .toList() + ); + } + + @VisibleForTesting + CompositeVendedCredentialProvider(List providers) { + this.providers = providers; + } + + @Override + public boolean supports(StorageAccessRequest request) { + return true; + } + + private VendedCredentialProvider providerFor(StorageAccessRequest request) { + return providers.stream() + .filter(provider -> provider.supports(request)) + .findFirst() + .orElse(FALLBACK_PROVIDER); + } + + @Override + public List vend(String username, List accessRequests) { + final var requestsByProvider = accessRequests.stream() + .collect(Collectors.groupingBy(this::providerFor, LinkedHashMap::new, Collectors.toList())); + return requestsByProvider.entrySet().stream() + .flatMap(entry -> entry.getKey().vend(username, entry.getValue()).stream()) + .toList(); + } +} diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/credential/StorageAccessRequest.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/credential/StorageAccessRequest.java new file mode 100644 index 000000000000..2605f115af29 --- /dev/null +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/credential/StorageAccessRequest.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.metastore.credential; + +import org.apache.hadoop.fs.Path; + +import java.util.Objects; +import java.util.Set; + +/** + * An object containing requested access to the given path. + * + * @param location a file or directory path. It must be an absolute path + * @param operations allowed operations + */ +public record StorageAccessRequest(Path location, Set operations) { + public StorageAccessRequest { + if (!Objects.requireNonNull(location).isAbsolute()) { + throw new IllegalArgumentException("Location must be absolute. Got: %s".formatted(location)); + } + if (Objects.requireNonNull(operations).isEmpty()) { + throw new IllegalArgumentException("Allowed operations cannot be empty"); + } + } +} diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/credential/StorageOperation.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/credential/StorageOperation.java new file mode 100644 index 000000000000..727d69f9c07b --- /dev/null +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/credential/StorageOperation.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.metastore.credential; + +/** + * The list of I/O operations to access a storage system. + */ +public enum StorageOperation { + LIST, READ, CREATE, DELETE, +} diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/credential/VendedCredentialProvider.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/credential/VendedCredentialProvider.java new file mode 100644 index 000000000000..95b75ba494cf --- /dev/null +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/credential/VendedCredentialProvider.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.metastore.credential; + +import org.apache.hadoop.classification.InterfaceStability; + +import java.util.List; + +/** + * A credential-vending service. + */ +@InterfaceStability.Unstable +public interface VendedCredentialProvider { + /** + * Checks whether this provider supports the given access request. + * + * @param request the access request + * @return true if this provider supports the given access request + */ + boolean supports(StorageAccessRequest request); + + /** + * Vends credentials for the given access. + * + * @param username the authenticated username + * @param accessRequests the vending requests + * @return a list of vended credentials + */ + List vend(String username, List accessRequests); +} diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/credential/VendedStorageCredential.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/credential/VendedStorageCredential.java new file mode 100644 index 000000000000..3f00a59634d9 --- /dev/null +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/credential/VendedStorageCredential.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.metastore.credential; + +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.fs.Path; + +import java.time.Instant; +import java.util.Map; +import java.util.Objects; + +/** + * Vended credential properties. + */ +@InterfaceStability.Unstable +public record VendedStorageCredential(Path prefix, Map credentials, Instant expiredAt) { + public VendedStorageCredential { + Objects.requireNonNull(prefix); + Objects.requireNonNull(credentials); + Objects.requireNonNull(expiredAt); + } +} diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/credential/s3/S3Location.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/credential/s3/S3Location.java new file mode 100644 index 000000000000..6e220f0d8834 --- /dev/null +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/credential/s3/S3Location.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.metastore.credential.s3; + +import org.apache.hadoop.fs.Path; +import software.amazon.awssdk.arns.Arn; + +import java.net.URI; +import java.util.Optional; +import java.util.Set; + +/** + * An S3 location. + */ +final class S3Location { + private static final Set SCHEMES = Set.of("s3", "s3a", "s3n"); + + private final String partition; + private final String bucket; + private final String path; + + private S3Location(String partition, String bucket, String path) { + this.partition = partition; + this.bucket = bucket; + this.path = path; + } + + static Optional create(String partition, URI uri) { + final var scheme = uri.getScheme(); + if (scheme == null) { + return Optional.empty(); + } + if (!SCHEMES.contains(scheme)) { + return Optional.empty(); + } + final var bucket = uri.getAuthority(); + if (bucket == null) { + return Optional.empty(); + } + final var rawPath = uri.getPath(); + if (rawPath == null) { + return Optional.empty(); + } + final var path = rawPath.endsWith(Path.SEPARATOR) ? rawPath : rawPath + Path.SEPARATOR; + return Optional.of(new S3Location(partition, bucket, path)); + } + + Arn getBucketArn() { + return Arn.builder().partition(partition).service("s3").resource(bucket).build(); + } + + Arn getWildCardArn() { + return Arn.builder().partition(partition).service("s3").resource("%s%s*".formatted(bucket, path)).build(); + } + + String getWildCardPath() { + return path.substring(1) + "*"; + } + + boolean matches(String prefix) { + final var optionalArn = Arn.tryFromString(prefix); + if (optionalArn.isPresent()) { + return getWildCardArn().toString().startsWith(prefix); + } + return "%s%s".formatted(bucket, path).startsWith(prefix); + } +} diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/credential/s3/S3VendedCredentialProvider.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/credential/s3/S3VendedCredentialProvider.java new file mode 100644 index 000000000000..24b9fd6b7866 --- /dev/null +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/credential/s3/S3VendedCredentialProvider.java @@ -0,0 +1,214 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.metastore.credential.s3; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.metastore.credential.StorageAccessRequest; +import org.apache.hadoop.hive.metastore.credential.VendedCredentialProvider; +import org.apache.hadoop.hive.metastore.credential.VendedStorageCredential; +import software.amazon.awssdk.arns.Arn; +import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider; +import software.amazon.awssdk.policybuilder.iam.IamConditionOperator; +import software.amazon.awssdk.policybuilder.iam.IamEffect; +import software.amazon.awssdk.policybuilder.iam.IamPolicy; +import software.amazon.awssdk.policybuilder.iam.IamStatement; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.sts.StsClient; +import software.amazon.awssdk.services.sts.model.AssumeRoleRequest; + +import java.time.Instant; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.TimeUnit; + +/** + * A VendedCredentialProvider specified for Amazon S3. + */ +public class S3VendedCredentialProvider implements VendedCredentialProvider { + private static final String PREFIXES_KEY = "aws.prefixes"; + private static final String REGION_KEY = "aws.region"; + private static final String ROLE_ARN_KEY = "aws.role-arn"; + private static final String EXTERNAL_ID_KEY = "aws.external-id"; + private static final String CREDENTIAL_EXPIRATION_KEY = "aws.expiration"; + private static final String SESSION_PREFIX = "hms_"; + + private final Arn roleArn; + private final String externalId; + private final List prefixes; + private final int expirationInSeconds; + private final StsClient stsClient; + + private static StsClient createStsClient(String region) { + final var credentialsProvider = DefaultCredentialsProvider.builder().build(); + final var builder = StsClient.builder().credentialsProvider(credentialsProvider); + return region == null ? builder.build() : builder.region(Region.of(region)).build(); + } + + private static List createPrefixes(String[] prefixes) { + if (prefixes == null) { + return Collections.emptyList(); + } + return Arrays.asList(prefixes); + } + + public S3VendedCredentialProvider(String configKeyPrefix, Configuration conf) { + this( + Arn.fromString(Objects.requireNonNull(conf.get(configKeyPrefix + ROLE_ARN_KEY))), + conf.get(configKeyPrefix + EXTERNAL_ID_KEY), + createPrefixes(conf.getStrings(configKeyPrefix + PREFIXES_KEY, (String) null)), + (int) Math.min( + Integer.MAX_VALUE, + conf.getTimeDuration(configKeyPrefix + CREDENTIAL_EXPIRATION_KEY, 3600, TimeUnit.SECONDS) + ), + createStsClient(conf.get(configKeyPrefix + REGION_KEY)) + ); + } + + @VisibleForTesting + S3VendedCredentialProvider(Arn roleArn, String externalId, List prefixes, int expirationSeconds, + StsClient stsClient) { + this.prefixes = prefixes; + this.roleArn = Objects.requireNonNull(roleArn); + this.externalId = externalId; + this.expirationInSeconds = expirationSeconds; + this.stsClient = Objects.requireNonNull(stsClient); + } + + @Override + public boolean supports(StorageAccessRequest request) { + final var optionalLocation = S3Location.create(roleArn.partition(), request.location().toUri()); + if (optionalLocation.isEmpty()) { + return false; + } + if (prefixes.isEmpty()) { + // Accepts all legal S3 paths + return true; + } + final var location = optionalLocation.orElseThrow(); + return prefixes.stream().anyMatch(location::matches); + } + + @Override + public List vend(String username, List accessRequests) { + // This provider issues a single assume-role request and get a merged credential to reduce the number of requests + final var assumeRoleRequest = AssumeRoleRequest.builder().externalId(externalId).roleArn(roleArn.toString()) + .roleSessionName(createRoleSessionName(username)).durationSeconds(expirationInSeconds) + .policy(buildPolicy(accessRequests).toJson()).build(); + final var response = stsClient.assumeRole(assumeRoleRequest); + final var awsCredentials = response.credentials(); + + final var builder = new HashMap(); + builder.put("s3.access-key-id", awsCredentials.accessKeyId()); + builder.put("s3.secret-access-key", awsCredentials.secretAccessKey()); + builder.put("s3.session-token", awsCredentials.sessionToken()); + final Instant expiredAt; + if (awsCredentials.expiration() == null) { + expiredAt = Instant.MAX; + } else { + expiredAt = awsCredentials.expiration(); + final var epochMillis = String.valueOf(awsCredentials.expiration().toEpochMilli()); + builder.put("s3.session-token-expires-at-ms", epochMillis); + } + final var credentials = Collections.unmodifiableMap(builder); + + return accessRequests.stream() + .map(request -> new VendedStorageCredential(request.location(), credentials, expiredAt)).toList(); + } + + @Override + public String toString() { + return "S3VendedCredentialProvider{" + "role='" + roleArn + '\'' + ", prefixes=" + prefixes + '}'; + } + + private static String createRoleSessionName(String username) { + final var builder = new StringBuilder(SESSION_PREFIX.length() + username.length()); + builder.append(SESSION_PREFIX); + for (char c: username.toCharArray()) { + if ("abcdefghijklmnopqrstuvwxyz0123456789,.@-".contains( + Character.toString(c).toLowerCase(Locale.ENGLISH))) { + builder.append(c); + } else { + builder.append('-'); + } + } + return builder.toString(); + } + + /** + * Creates a down-scoped policy for the given request. + */ + private IamPolicy buildPolicy(List requests) { + final Map bucketLocationBuilder = new HashMap<>(); + final Map listBuilder = new HashMap<>(); + final List readResources = new ArrayList<>(); + final List createResources = new ArrayList<>(); + final List deleteResources = new ArrayList<>(); + + requests.forEach(request -> { + Preconditions.checkArgument(supports(request)); + final var s3Location = S3Location.create(roleArn.partition(), request.location().toUri()).orElseThrow(); + final var bucketArn = s3Location.getBucketArn().toString(); + final var wildCardArn = s3Location.getWildCardArn().toString(); + bucketLocationBuilder.computeIfAbsent(bucketArn, + key -> IamStatement.builder().effect(IamEffect.ALLOW).addAction("s3:GetBucketLocation").addResource(key)); + request.operations().forEach(action -> { + switch (action) { + case LIST -> listBuilder.computeIfAbsent(bucketArn, + key -> IamStatement.builder().effect(IamEffect.ALLOW).addAction("s3:ListBucket").addResource(key)) + .addCondition(IamConditionOperator.STRING_LIKE, "s3:prefix", s3Location.getWildCardPath()); + case READ -> readResources.add(wildCardArn); + case CREATE -> createResources.add(wildCardArn); + case DELETE -> deleteResources.add(wildCardArn); + default -> throw new IllegalArgumentException("Unexpected action: " + action); + } + }); + }); + + final var policyBuilder = IamPolicy.builder(); + bucketLocationBuilder.values().stream().map(IamStatement.Builder::build).forEach(policyBuilder::addStatement); + listBuilder.values().stream().map(IamStatement.Builder::build).forEach(policyBuilder::addStatement); + if (!readResources.isEmpty()) { + policyBuilder.addStatement(builder -> { + builder.effect(IamEffect.ALLOW).addAction("s3:GetObject").addAction("s3:GetObjectVersion"); + readResources.forEach(builder::addResource); + }); + } + if (!createResources.isEmpty()) { + policyBuilder.addStatement(builder -> { + builder.effect(IamEffect.ALLOW).addAction("s3:PutObject"); + createResources.forEach(builder::addResource); + }); + } + if (!deleteResources.isEmpty()) { + policyBuilder.addStatement(builder -> { + builder.effect(IamEffect.ALLOW).addAction("s3:DeleteObject"); + deleteResources.forEach(builder::addResource); + }); + } + return policyBuilder.build(); + } +} diff --git a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/annotation/MetastoreExternalTest.java b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/annotation/MetastoreExternalTest.java new file mode 100644 index 000000000000..ea436d77433d --- /dev/null +++ b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/annotation/MetastoreExternalTest.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.metastore.annotation; + +/** + * Marker interface for tests that require external systems or credentials and should only run + * when explicitly opted into. + */ +public interface MetastoreExternalTest extends MetastoreTest { +} diff --git a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/credential/TestCachedVendedCredentialProvider.java b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/credential/TestCachedVendedCredentialProvider.java new file mode 100644 index 000000000000..caeaa093c8e9 --- /dev/null +++ b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/credential/TestCachedVendedCredentialProvider.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.metastore.credential; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.metastore.annotation.MetastoreUnitTest; +import org.junit.Assert; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.mockito.Mockito; + +import java.time.Clock; +import java.time.Duration; +import java.time.Instant; +import java.time.ZoneOffset; +import java.util.EnumSet; +import java.util.List; +import java.util.Map; + +@Category(MetastoreUnitTest.class) +public class TestCachedVendedCredentialProvider { + @Test + public void testSupportsDelegatesToWrappedProvider() { + var request = new StorageAccessRequest( + new Path("s3://bucket-a/warehouse/table"), + EnumSet.of(StorageOperation.READ)); + var delegate = Mockito.mock(VendedCredentialProvider.class); + Mockito.when(delegate.supports(request)).thenReturn(true); + var provider = new CachedVendedCredentialProvider( + delegate, + 100, + Duration.ofMinutes(30), + Clock.systemUTC()); + + Assert.assertTrue(provider.supports(request)); + Mockito.verify(delegate).supports(request); + } + + @Test + public void testVend() { + var now = Instant.parse("2026-04-26T12:00:00Z"); + var path = new Path("s3://bucket-a/warehouse/table"); + var request = new StorageAccessRequest(path, EnumSet.of(StorageOperation.READ)); + var requests = List.of(request); + var response = List.of(new VendedStorageCredential(path, Map.of("token", "first"), + now.plus(Duration.ofMinutes(20)))); + var delegate = Mockito.mock(VendedCredentialProvider.class); + Mockito.when(delegate.vend("alice", requests)) + .thenReturn(response) + .thenThrow(new AssertionError("The second request should be served from cache")); + var provider = new CachedVendedCredentialProvider( + delegate, + 100, + Duration.ofMinutes(30), + Clock.fixed(now, ZoneOffset.UTC)); + + var first = provider.vend("alice", requests); + var second = provider.vend("alice", requests); + Assert.assertSame(first, second); + + Mockito.verify(delegate, Mockito.times(1)).vend("alice", requests); + Mockito.verifyNoMoreInteractions(delegate); + } + + @Test + public void testVendWithDifferentPrincipals() { + var now = Instant.parse("2026-04-26T12:00:00Z"); + var path = new Path("s3://bucket-a/warehouse/table"); + var request = new StorageAccessRequest(path, EnumSet.of(StorageOperation.READ)); + var requests = List.of(request); + var response = List.of(new VendedStorageCredential(path, Map.of("token", "first"), + now.plus(Duration.ofMinutes(20)))); + var delegate = Mockito.mock(VendedCredentialProvider.class); + Mockito.when(delegate.vend("alice", requests)) + .thenReturn(response) + .thenThrow(new AssertionError("The second request should be served from cache")); + Mockito.when(delegate.vend("bob", requests)) + .thenReturn(response) + .thenThrow(new AssertionError("The second request should be served from cache")); + var provider = new CachedVendedCredentialProvider( + delegate, + 100, + Duration.ofMinutes(30), + Clock.fixed(now, ZoneOffset.UTC)); + + var alice = provider.vend("alice", requests); + var bob = provider.vend("bob", requests); + Assert.assertSame(alice, bob); + + Mockito.verify(delegate, Mockito.times(1)).vend("alice", requests); + Mockito.verify(delegate, Mockito.times(1)).vend("bob", requests); + Mockito.verifyNoMoreInteractions(delegate); + } +} diff --git a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/credential/TestCompositeVendedCredentialProvider.java b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/credential/TestCompositeVendedCredentialProvider.java new file mode 100644 index 000000000000..8f44085c4f39 --- /dev/null +++ b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/credential/TestCompositeVendedCredentialProvider.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.metastore.credential; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.metastore.annotation.MetastoreUnitTest; +import org.junit.Assert; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.mockito.Mockito; + +import java.time.Instant; +import java.util.EnumSet; +import java.util.List; +import java.util.Map; +import java.util.stream.Stream; + +@Category(MetastoreUnitTest.class) +public class TestCompositeVendedCredentialProvider { + @Test + public void testVendRoutesRequestsToSupportingProviders() { + var firstPath = new Path("s3://bucket-a/warehouse/table-a"); + var firstRequest = new StorageAccessRequest(firstPath, EnumSet.of(StorageOperation.READ)); + var secondPath = new Path("s3://bucket-b/warehouse/table-b"); + var secondRequest = new StorageAccessRequest(secondPath, EnumSet.of(StorageOperation.READ)); + var thirdPath = new Path("s3://bucket-a/warehouse/table-c"); + var thirdRequest = new StorageAccessRequest(thirdPath, EnumSet.of(StorageOperation.READ)); + var requests = List.of(firstRequest, secondRequest, thirdRequest); + var expiration = Instant.MAX; + + var firstProvider = Mockito.mock(VendedCredentialProvider.class); + Mockito.when(firstProvider.supports(firstRequest)).thenReturn(true); + Mockito.when(firstProvider.supports(secondRequest)).thenReturn(false); + Mockito.when(firstProvider.supports(thirdRequest)).thenReturn(true); + var firstCredentials = List.of( + new VendedStorageCredential(firstPath, Map.of("provider", "first"), expiration), + new VendedStorageCredential(thirdPath, Map.of("provider", "first"), expiration) + ); + Mockito.when(firstProvider.vend("alice", List.of(firstRequest, thirdRequest))).thenReturn(firstCredentials); + + var secondProvider = Mockito.mock(VendedCredentialProvider.class); + Mockito.when(secondProvider.supports(secondRequest)).thenReturn(true); + var secondCredentials = List.of(new VendedStorageCredential(secondPath, Map.of("provider", "first"), expiration)); + Mockito.when(secondProvider.vend("alice", List.of(secondRequest))).thenReturn(secondCredentials); + + var provider = new CompositeVendedCredentialProvider(List.of(firstProvider, secondProvider)); + + var credentials = provider.vend("alice", requests); + var expected = Stream.concat(firstCredentials.stream(), secondCredentials.stream()).toList(); + Assert.assertEquals(expected, credentials); + + Mockito.verify(firstProvider).supports(firstRequest); + Mockito.verify(firstProvider).supports(secondRequest); + Mockito.verify(firstProvider).supports(thirdRequest); + Mockito.verify(firstProvider).vend("alice", List.of(firstRequest, thirdRequest)); + Mockito.verify(secondProvider).supports(secondRequest); + Mockito.verify(secondProvider).vend("alice", List.of(secondRequest)); + Mockito.verifyNoMoreInteractions(firstProvider, secondProvider); + } + + @Test + public void testVendUsesFirstSupportingProvider() { + var path = new Path("s3://bucket-a/warehouse/table-a"); + var request = new StorageAccessRequest(path, EnumSet.of(StorageOperation.READ)); + var requests = List.of(request); + + var firstProvider = Mockito.mock(VendedCredentialProvider.class); + Mockito.when(firstProvider.supports(request)).thenReturn(true); + var expected = List.of(new VendedStorageCredential(path, Map.of("provider", "first"), Instant.MAX)); + Mockito.when(firstProvider.vend("alice", requests)).thenReturn(expected); + + var secondProvider = Mockito.mock(VendedCredentialProvider.class); + + var provider = new CompositeVendedCredentialProvider(List.of(firstProvider, secondProvider)); + + var credentials = provider.vend("alice", requests); + Assert.assertEquals(expected, credentials); + + Mockito.verify(firstProvider).supports(request); + Mockito.verify(firstProvider).vend("alice", requests); + Mockito.verifyNoInteractions(secondProvider); + } + + @Test + public void testVendFallsBackToNoopProviderWhenNoProviderSupportsRequest() { + var request = new StorageAccessRequest( + new Path("s3://bucket-a/warehouse/table-a"), + EnumSet.of(StorageOperation.READ)); + var requests = List.of(request); + + var firstProvider = Mockito.mock(VendedCredentialProvider.class); + Mockito.when(firstProvider.supports(request)).thenReturn(false); + + var secondProvider = Mockito.mock(VendedCredentialProvider.class); + Mockito.when(secondProvider.supports(request)).thenReturn(false); + + var provider = new CompositeVendedCredentialProvider(List.of(firstProvider, secondProvider)); + + var credentials = provider.vend("alice", requests); + + Assert.assertTrue(credentials.isEmpty()); + Mockito.verify(firstProvider).supports(request); + Mockito.verify(secondProvider).supports(request); + Mockito.verifyNoMoreInteractions(firstProvider, secondProvider); + } +} diff --git a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/credential/s3/TestS3Location.java b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/credential/s3/TestS3Location.java new file mode 100644 index 000000000000..6a149b7ce413 --- /dev/null +++ b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/credential/s3/TestS3Location.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.metastore.credential.s3; + +import org.apache.hadoop.hive.metastore.annotation.MetastoreUnitTest; +import org.junit.Assert; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import java.net.URI; +import java.net.URISyntaxException; + +@Category(MetastoreUnitTest.class) +public class TestS3Location { + @Test + public void test() { + var location = S3Location.create("aws", URI.create("s3://bucket/warehouse/tbl")).orElseThrow(); + Assert.assertEquals("arn:aws:s3:::bucket", location.getBucketArn().toString()); + Assert.assertEquals("arn:aws:s3:::bucket/warehouse/tbl/*", location.getWildCardArn().toString()); + Assert.assertEquals("warehouse/tbl/*", location.getWildCardPath()); + } + + @Test + public void testTrailingSlash() { + var location = S3Location.create("aws", URI.create("s3://bucket/warehouse/tbl/")).orElseThrow(); + Assert.assertEquals("arn:aws:s3:::bucket", location.getBucketArn().toString()); + Assert.assertEquals("arn:aws:s3:::bucket/warehouse/tbl/*", location.getWildCardArn().toString()); + Assert.assertEquals("warehouse/tbl/*", location.getWildCardPath()); + } + + @Test + public void testRoot() { + var location = S3Location.create("aws", URI.create("s3://bucket")).orElseThrow(); + Assert.assertEquals("arn:aws:s3:::bucket", location.getBucketArn().toString()); + Assert.assertEquals("arn:aws:s3:::bucket/*", location.getWildCardArn().toString()); + Assert.assertEquals("*", location.getWildCardPath()); + } + + @Test + public void testRootWithTrailingSlash() { + var location = S3Location.create("aws", URI.create("s3://bucket/")).orElseThrow(); + Assert.assertEquals("arn:aws:s3:::bucket", location.getBucketArn().toString()); + Assert.assertEquals("arn:aws:s3:::bucket/*", location.getWildCardArn().toString()); + Assert.assertEquals("*", location.getWildCardPath()); + } + + @Test + public void testS3A() { + var location = S3Location.create("aws", URI.create("s3a://bucket/warehouse/tbl")).orElseThrow(); + Assert.assertEquals("arn:aws:s3:::bucket", location.getBucketArn().toString()); + Assert.assertEquals("arn:aws:s3:::bucket/warehouse/tbl/*", location.getWildCardArn().toString()); + Assert.assertEquals("warehouse/tbl/*", location.getWildCardPath()); + } + + @Test + public void testS3N() { + var location = S3Location.create("aws", URI.create("s3n://bucket/warehouse/tbl")).orElseThrow(); + Assert.assertEquals("arn:aws:s3:::bucket", location.getBucketArn().toString()); + Assert.assertEquals("arn:aws:s3:::bucket/warehouse/tbl/*", location.getWildCardArn().toString()); + Assert.assertEquals("warehouse/tbl/*", location.getWildCardPath()); + } + + @Test + public void testPartition() { + var location = S3Location.create("aws-us-gov", URI.create("s3://bucket/warehouse/tbl")).orElseThrow(); + Assert.assertEquals("arn:aws-us-gov:s3:::bucket", location.getBucketArn().toString()); + Assert.assertEquals("arn:aws-us-gov:s3:::bucket/warehouse/tbl/*", location.getWildCardArn().toString()); + Assert.assertEquals("warehouse/tbl/*", location.getWildCardPath()); + } + + @Test + public void testMatchesArnPrefix() { + var location = S3Location.create("aws", URI.create("s3://bucket/warehouse/tbl")).orElseThrow(); + + Assert.assertTrue(location.matches("arn:aws:s3:::bucket")); + Assert.assertTrue(location.matches("arn:aws:s3:::bucket/warehouse/")); + Assert.assertFalse(location.matches("arn:aws:s3:::bucket/curated/")); + } + + @Test + public void testMatchesBucketPrefix() { + var location = S3Location.create("aws", URI.create("s3://bucket/warehouse/tbl")).orElseThrow(); + + Assert.assertTrue(location.matches("bucket")); + Assert.assertTrue(location.matches("bucket/warehouse/")); + Assert.assertFalse(location.matches("bucket/curated/")); + } + + @Test + public void testUnsupportedPaths() { + Assert.assertTrue(S3Location.create("aws", URI.create("/bucket/warehouse/tbl")).isEmpty()); + Assert.assertTrue(S3Location.create("aws", URI.create("s3b://bucket/warehouse/tbl")).isEmpty()); + Assert.assertTrue(S3Location.create("aws", URI.create("s3:///warehouse/tbl")).isEmpty()); + Assert.assertTrue(S3Location.create("aws", URI.create("s3:bucket")).isEmpty()); + } +} diff --git a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/credential/s3/TestS3VendedCredentialProvider.java b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/credential/s3/TestS3VendedCredentialProvider.java new file mode 100644 index 000000000000..657cae594e15 --- /dev/null +++ b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/credential/s3/TestS3VendedCredentialProvider.java @@ -0,0 +1,329 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.metastore.credential.s3; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.metastore.annotation.MetastoreUnitTest; +import org.apache.hadoop.hive.metastore.credential.StorageAccessRequest; +import org.apache.hadoop.hive.metastore.credential.StorageOperation; +import org.apache.hadoop.hive.metastore.credential.VendedStorageCredential; +import org.junit.Assert; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.mockito.ArgumentCaptor; +import org.mockito.Mockito; +import software.amazon.awssdk.arns.Arn; +import software.amazon.awssdk.services.sts.StsClient; +import software.amazon.awssdk.services.sts.model.AssumeRoleRequest; +import software.amazon.awssdk.services.sts.model.AssumeRoleResponse; +import software.amazon.awssdk.services.sts.model.Credentials; + +import java.time.Instant; +import java.util.Collections; +import java.util.EnumSet; +import java.util.List; +import java.util.Map; + +@Category(MetastoreUnitTest.class) +public class TestS3VendedCredentialProvider { + private static final ObjectMapper MAPPER = new ObjectMapper(); + + @Test + public void testSupportsWhenPrefixesAreEmpty() { + var provider = new S3VendedCredentialProvider( + Arn.fromString("arn:aws:iam::123456789012:role/test-role"), + null, + Collections.emptyList(), + 3600, + Mockito.mock(StsClient.class)); + + Assert.assertTrue(provider.supports(new StorageAccessRequest( + new Path("s3://bucket-a/warehouse/table"), + EnumSet.of(StorageOperation.READ)))); + } + + @Test + public void testSupportsWithPrefix() { + var provider = new S3VendedCredentialProvider( + Arn.fromString("arn:aws:iam::123456789012:role/test-role"), + null, + List.of("bucket-a/warehouse/", "bucket-x/hive/"), + 3600, + Mockito.mock(StsClient.class)); + + var matched = new StorageAccessRequest( + new Path("s3a://bucket-a/warehouse/table"), + EnumSet.of(StorageOperation.READ)); + Assert.assertTrue(provider.supports(matched)); + var unmatched = new StorageAccessRequest( + new Path("s3a://bucket-b/warehouse/table"), + EnumSet.of(StorageOperation.READ)); + Assert.assertFalse(provider.supports(unmatched)); + var requests = Collections.singletonList(unmatched); + Assert.assertThrows(IllegalArgumentException.class, () -> provider.vend("test-user", requests)); + } + + @Test + public void testSupportsWithArnPrefix() { + var provider = new S3VendedCredentialProvider( + Arn.fromString("arn:aws:iam::123456789012:role/test-role"), + null, + List.of("arn:aws:s3:::bucket-a/warehouse/", "arn:aws:s3:::bucket-x/hive/"), + 3600, + Mockito.mock(StsClient.class)); + + var matched = new StorageAccessRequest( + new Path("s3a://bucket-a/warehouse/table"), + EnumSet.of(StorageOperation.READ)); + Assert.assertTrue(provider.supports(matched)); + var unmatched = new StorageAccessRequest( + new Path("s3a://bucket-b/warehouse/table"), + EnumSet.of(StorageOperation.READ)); + Assert.assertFalse(provider.supports(unmatched)); + Assert.assertThrows(IllegalArgumentException.class, + () -> provider.vend("test-user", Collections.singletonList(unmatched))); + } + + @Test + public void testSupportsWithUnsupportedPaths() { + var provider = new S3VendedCredentialProvider( + Arn.fromString("arn:aws:iam::123456789012:role/test-role"), + null, + Collections.emptyList(), + 3600, + Mockito.mock(StsClient.class)); + + var nonSchema = new StorageAccessRequest( + new Path("/bucket-a/warehouse/table"), + EnumSet.of(StorageOperation.READ)); + Assert.assertFalse(provider.supports(nonSchema)); + + var nonS3 = new StorageAccessRequest( + new Path("hdfs://bucket-a/warehouse/table"), + EnumSet.of(StorageOperation.READ)); + Assert.assertFalse(provider.supports(nonS3)); + + var nonAuthority = new StorageAccessRequest( + new Path("s3a:///warehouse/table"), + EnumSet.of(StorageOperation.READ)); + Assert.assertFalse(provider.supports(nonAuthority)); + } + + @Test + public void testVend() throws Exception { + var stsClient = Mockito.mock(StsClient.class); + var requestCaptor = ArgumentCaptor.forClass(AssumeRoleRequest.class); + var accessKey = "dummy-access-key"; + var secretKey = "dummy-secret-key"; + var sessionToken = "dummy-session-token"; + var expiration = Instant.parse("2026-04-26T12:00:00Z"); + Mockito.when(stsClient.assumeRole(requestCaptor.capture())).thenReturn( + AssumeRoleResponse.builder().credentials( + Credentials.builder() + .accessKeyId(accessKey) + .secretAccessKey(secretKey) + .sessionToken(sessionToken) + .expiration(expiration) + .build()) + .build()); + + var provider = new S3VendedCredentialProvider( + Arn.fromString("arn:aws-us-gov:iam::123456789012:role/test-role"), + "external-id", + Collections.emptyList(), + 1200, + stsClient); + var credentials = provider.vend( + "User Name+1@example.com", + List.of( + new StorageAccessRequest( + new Path("s3://bucket-realtime/warehouse/table"), + EnumSet.of(StorageOperation.LIST, StorageOperation.READ, StorageOperation.CREATE, + StorageOperation.DELETE)), + new StorageAccessRequest( + new Path("s3n://bucket-archive/warehouse/table"), + EnumSet.of(StorageOperation.LIST, StorageOperation.READ)) + ) + ); + + var expected = List.of( + new VendedStorageCredential( + new Path("s3://bucket-realtime/warehouse/table"), + Map.of( + "s3.access-key-id", accessKey, + "s3.secret-access-key", secretKey, + "s3.session-token", sessionToken, + "s3.session-token-expires-at-ms", "1777204800000" + ), + expiration + ), + new VendedStorageCredential( + new Path("s3n://bucket-archive/warehouse/table"), + Map.of( + "s3.access-key-id", accessKey, + "s3.secret-access-key", secretKey, + "s3.session-token", sessionToken, + "s3.session-token-expires-at-ms", "1777204800000" + ), + expiration + ) + ); + Assert.assertEquals(expected, credentials); + + var request = requestCaptor.getValue(); + Assert.assertEquals("arn:aws-us-gov:iam::123456789012:role/test-role", request.roleArn()); + Assert.assertEquals("external-id", request.externalId()); + Assert.assertEquals(Integer.valueOf(1200), request.durationSeconds()); + Assert.assertEquals("hms_User-Name-1@example.com", request.roleSessionName()); + + Assert.assertEquals( + MAPPER.readTree(""" + { + "Version": "2012-10-17", + "Statement": [ + { + "Effect": "Allow", + "Action": "s3:GetBucketLocation", + "Resource": "arn:aws-us-gov:s3:::bucket-archive" + }, + { + "Effect": "Allow", + "Action": "s3:GetBucketLocation", + "Resource": "arn:aws-us-gov:s3:::bucket-realtime" + }, + { + "Effect": "Allow", + "Action": "s3:ListBucket", + "Resource": "arn:aws-us-gov:s3:::bucket-archive", + "Condition": { + "StringLike": { + "s3:prefix": "warehouse/table/*" + } + } + }, + { + "Effect": "Allow", + "Action": "s3:ListBucket", + "Resource": "arn:aws-us-gov:s3:::bucket-realtime", + "Condition": { + "StringLike": { + "s3:prefix": "warehouse/table/*" + } + } + }, + { + "Effect": "Allow", + "Action": [ + "s3:GetObject", + "s3:GetObjectVersion" + ], + "Resource": [ + "arn:aws-us-gov:s3:::bucket-realtime/warehouse/table/*", + "arn:aws-us-gov:s3:::bucket-archive/warehouse/table/*" + ] + }, + { + "Effect": "Allow", + "Action": "s3:PutObject", + "Resource": "arn:aws-us-gov:s3:::bucket-realtime/warehouse/table/*" + }, + { + "Effect": "Allow", + "Action": "s3:DeleteObject", + "Resource": "arn:aws-us-gov:s3:::bucket-realtime/warehouse/table/*" + } + ] + } + """), + MAPPER.readTree(request.policy())); + } + + @Test + public void testVendWithoutExpiration() throws Exception { + var stsClient = Mockito.mock(StsClient.class); + var requestCaptor = ArgumentCaptor.forClass(AssumeRoleRequest.class); + var accessKey = "dummy-access-key"; + var secretKey = "dummy-secret-key"; + var sessionToken = "dummy-session-token"; + Mockito.when(stsClient.assumeRole(requestCaptor.capture())).thenReturn( + AssumeRoleResponse.builder().credentials( + Credentials.builder() + .accessKeyId(accessKey) + .secretAccessKey(secretKey) + .sessionToken(sessionToken) + .build()) + .build()); + + var provider = new S3VendedCredentialProvider( + Arn.fromString("arn:aws:iam::123456789012:role/test-role"), + null, + Collections.emptyList(), + 3600, + stsClient); + var credentials = provider.vend( + "user", + List.of( + new StorageAccessRequest( + new Path("s3a://bucket-a/warehouse/table"), + EnumSet.of(StorageOperation.READ)) + ) + ); + + var expected = new VendedStorageCredential( + new Path("s3a://bucket-a/warehouse/table"), + Map.of( + "s3.access-key-id", accessKey, + "s3.secret-access-key", secretKey, + "s3.session-token", sessionToken + ), + Instant.MAX + ); + Assert.assertEquals(List.of(expected), credentials); + + var request = requestCaptor.getValue(); + Assert.assertEquals("arn:aws:iam::123456789012:role/test-role", request.roleArn()); + Assert.assertNull(request.externalId()); + Assert.assertEquals(Integer.valueOf(3600), request.durationSeconds()); + Assert.assertEquals("hms_user", request.roleSessionName()); + + Assert.assertEquals( + MAPPER.readTree(""" + { + "Version": "2012-10-17", + "Statement": [ + { + "Effect": "Allow", + "Action": "s3:GetBucketLocation", + "Resource": "arn:aws:s3:::bucket-a" + }, + { + "Effect": "Allow", + "Action": [ + "s3:GetObject", + "s3:GetObjectVersion" + ], + "Resource": "arn:aws:s3:::bucket-a/warehouse/table/*" + } + ] + } + """), + MAPPER.readTree(request.policy())); + } +} diff --git a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/credential/s3/TestS3VendedCredentialProviderIntegration.java b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/credential/s3/TestS3VendedCredentialProviderIntegration.java new file mode 100644 index 000000000000..a5dd72f46e85 --- /dev/null +++ b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/credential/s3/TestS3VendedCredentialProviderIntegration.java @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.metastore.credential.s3; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.metastore.annotation.MetastoreExternalTest; +import org.apache.hadoop.hive.metastore.credential.StorageAccessRequest; +import org.apache.hadoop.hive.metastore.credential.StorageOperation; +import org.apache.hadoop.hive.metastore.credential.VendedStorageCredential; +import org.apache.hadoop.hive.metastore.testutils.AwsS3IntegrationTestConfig; +import org.junit.After; +import org.junit.Assert; +import org.junit.Assume; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import software.amazon.awssdk.arns.Arn; +import software.amazon.awssdk.auth.credentials.AwsSessionCredentials; +import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; +import software.amazon.awssdk.core.sync.RequestBody; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.s3.S3Client; +import software.amazon.awssdk.services.s3.model.AccessDeniedException; +import software.amazon.awssdk.services.s3.model.DeleteObjectRequest; +import software.amazon.awssdk.services.s3.model.GetBucketLocationRequest; +import software.amazon.awssdk.services.s3.model.GetObjectRequest; +import software.amazon.awssdk.services.s3.model.ListObjectsV2Request; +import software.amazon.awssdk.services.s3.model.NoSuchKeyException; +import software.amazon.awssdk.services.s3.model.PutObjectRequest; +import software.amazon.awssdk.services.s3.model.S3Exception; +import software.amazon.awssdk.services.s3.model.S3Object; +import software.amazon.awssdk.services.sts.StsClient; + +import java.util.EnumSet; +import java.util.List; +import java.util.UUID; + +@Category(MetastoreExternalTest.class) +public class TestS3VendedCredentialProviderIntegration { + private static final String READABLE_FILE_CONTENT = "this-is-read-only"; + private static final String WRITABLE_FILE_CONTENT = "this-is-deletable"; + + private AwsS3IntegrationTestConfig config; + private S3Client adminS3; + private StsClient stsClient; + private String readOnlyPrefix; + private String readableKey; + private String readableKeyVersion; + private String writablePrefix; + private String writableKey; + private String writableKeyVersion; + private String deniedPrefix; + private String deniedKey; + + private StsClient createStsClient() { + stsClient = StsClient.builder().region(config.region()).build(); + return stsClient; + } + + private static S3Client createSessionS3Client(Region region, VendedStorageCredential credential) { + var sessionCredentials = AwsSessionCredentials.create( + credential.credentials().get("s3.access-key-id"), + credential.credentials().get("s3.secret-access-key"), + credential.credentials().get("s3.session-token")); + return S3Client.builder() + .region(region) + .credentialsProvider(StaticCredentialsProvider.create(sessionCredentials)) + .build(); + } + + private static void deleteObjectIfExists(S3Client s3, String bucket, String key) { + try { + s3.deleteObject(DeleteObjectRequest.builder().bucket(bucket).key(key).build()); + } catch (S3Exception e) { + if (e.statusCode() != 404) { + throw e; + } + } + } + + private void assertForbidden(Runnable command) { + Assert.assertThrows(AccessDeniedException.class, command::run); + } + + private void assertNotFound(Runnable command) { + Assert.assertThrows(NoSuchKeyException.class, command::run); + } + + @Before + public void setUp() { + Assume.assumeTrue("Test configurations are not available", AwsS3IntegrationTestConfig.isConfigured()); + + config = AwsS3IntegrationTestConfig.fromEnvironment(); + adminS3 = S3Client.builder().region(config.region()).build(); + + var rootPrefix = "%s/%s".formatted(config.basePath(), UUID.randomUUID()); + readOnlyPrefix = rootPrefix + "/read-only"; + readableKey = readOnlyPrefix + "/readable.txt"; + writablePrefix = rootPrefix + "/read-write"; + writableKey = writablePrefix + "/writable.txt"; + deniedPrefix = rootPrefix + "/denied"; + deniedKey = deniedPrefix + "/denied/unreadable.txt"; + + readableKeyVersion = adminS3.putObject( + PutObjectRequest.builder().bucket(config.bucket()).key(readableKey).build(), + RequestBody.fromString(READABLE_FILE_CONTENT)).versionId(); + Assert.assertNotNull(readableKeyVersion); + writableKeyVersion = adminS3.putObject( + PutObjectRequest.builder().bucket(config.bucket()).key(writableKey).build(), + RequestBody.fromString(WRITABLE_FILE_CONTENT)).versionId(); + Assert.assertNotNull(writableKeyVersion); + adminS3.putObject( + PutObjectRequest.builder().bucket(config.bucket()).key(deniedKey).build(), + RequestBody.fromString("outside-scope")); + } + + @After + public void tearDown() { + if (adminS3 != null) { + deleteObjectIfExists(adminS3, config.bucket(), readableKey); + deleteObjectIfExists(adminS3, config.bucket(), writableKey); + deleteObjectIfExists(adminS3, config.bucket(), deniedKey); + adminS3.close(); + } + if (stsClient != null) { + stsClient.close(); + } + } + + @Test + public void testVend() { + var provider = new S3VendedCredentialProvider( + Arn.fromString(config.roleArn()), + config.externalId(), + List.of(), + 900, + createStsClient()); + + var requests = List.of( + new StorageAccessRequest( + new Path("s3://%s/%s".formatted(config.bucket(), readOnlyPrefix)), + EnumSet.of(StorageOperation.LIST, StorageOperation.READ)), + new StorageAccessRequest( + new Path("s3a://%s/%s".formatted(config.bucket(), writablePrefix)), + EnumSet.of(StorageOperation.LIST, StorageOperation.READ, StorageOperation.CREATE, StorageOperation.DELETE)) + ); + + var credentials = provider.vend("integration-test-user", requests); + Assert.assertEquals(2, credentials.size()); + // This provider gets a merged credential for all access requests + Assert.assertEquals(credentials.get(0).credentials(), credentials.get(1).credentials()); + + try (var sessionS3 = createSessionS3Client(config.region(), credentials.getFirst())) { + var location = sessionS3.getBucketLocation(GetBucketLocationRequest.builder().bucket(config.bucket()).build()); + Assert.assertEquals("", location.locationConstraintAsString()); + + // Readable path + var listReadable = sessionS3 .listObjectsV2( + ListObjectsV2Request.builder().bucket(config.bucket()).prefix(readOnlyPrefix + "/").build()) + .contents().stream().map(S3Object::key).toList(); + Assert.assertEquals(List.of(readableKey), listReadable); + + var getReadable = sessionS3.getObjectAsBytes( + GetObjectRequest.builder().bucket(config.bucket()).key(readableKey).versionId(readableKeyVersion).build() + ).asUtf8String(); + Assert.assertEquals(READABLE_FILE_CONTENT, getReadable); + + assertForbidden(() -> sessionS3 .putObject( + PutObjectRequest.builder().bucket(config.bucket()).key(readOnlyPrefix + "/test-put-readable.txt").build(), + RequestBody.fromString("test-put-readable"))); + + assertForbidden(() -> sessionS3 .deleteObject( + DeleteObjectRequest.builder().bucket(config.bucket()).key(readableKey).build() + )); + + // Writable path + var listWritable = sessionS3 .listObjectsV2( + ListObjectsV2Request.builder().bucket(config.bucket()).prefix(writablePrefix + "/").build()) + .contents().stream().map(S3Object::key).toList(); + Assert.assertEquals(List.of(writableKey), listWritable); + + var getWritable = sessionS3.getObjectAsBytes( + GetObjectRequest.builder().bucket(config.bucket()).key(writableKey).versionId(writableKeyVersion).build() + ).asUtf8String(); + Assert.assertEquals(WRITABLE_FILE_CONTENT, getWritable); + + sessionS3.putObject( + PutObjectRequest.builder().bucket(config.bucket()).key(writablePrefix + "/test-put-writable.txt").build(), + RequestBody.fromString("test-put-writable") + ); + Assert.assertEquals("test-put-writable", sessionS3.getObjectAsBytes( + GetObjectRequest.builder().bucket(config.bucket()).key(writablePrefix + "/test-put-writable.txt").build() + ).asUtf8String()); + + sessionS3.deleteObject( + DeleteObjectRequest.builder().bucket(config.bucket()).key(writableKey).build()); + assertNotFound(() -> sessionS3.getObject( + GetObjectRequest.builder().bucket(config.bucket()).key(writableKey).build())); + + // Denied path + assertForbidden(() -> sessionS3 .listObjectsV2( + ListObjectsV2Request.builder().bucket(config.bucket()).prefix(deniedPrefix + "/").build())); + assertForbidden(() -> sessionS3.getObject( + GetObjectRequest.builder().bucket(config.bucket()).key(deniedKey).build())); + assertForbidden(() -> sessionS3.putObject( + PutObjectRequest.builder().bucket(config.bucket()).key(deniedPrefix + "/test-put-denied.txt").build(), + RequestBody.fromString("test-put-denied"))); + assertForbidden(() -> sessionS3 .deleteObject( + DeleteObjectRequest.builder().bucket(config.bucket()).key(deniedKey).build())); + } + } +} diff --git a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/testutils/AwsS3IntegrationTestConfig.java b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/testutils/AwsS3IntegrationTestConfig.java new file mode 100644 index 000000000000..7014803b3ce4 --- /dev/null +++ b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/testutils/AwsS3IntegrationTestConfig.java @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.metastore.testutils; + +import software.amazon.awssdk.regions.Region; + +/** + * AWS Integration tests don't run by default. You need to set up your environment. + * + * ACCOUNT_ID={your AWS account ID} + * HMS_PRINCIPAL_ARN="arn:aws:iam::${ACCOUNT_ID}:{user or role}" + * + * REGION=us-east-1 + * ROLE_NAME=hive-s3-vending-test-role + * export HIVE_IT_AWS_INTEGRATION_TEST_ENABLED=true + * export HIVE_IT_S3_BUCKET="$BUCKET-$ACCOUNT_ID-$REGION-an" + * export HIVE_IT_S3_TEST_PATH=hive-test + * export HIVE_IT_S3_ROLE_ARN="arn:aws:iam::${ACCOUNT_ID}:role/$ROLE_NAME" + * export HIVE_IT_S3_EXTERNAL_ID=hive-s3-vending-test + * export HIVE_IT_S3_REGION=us-east-1 + * + * aws s3api create-bucket \ + * --bucket "${HIVE_IT_S3_BUCKET}" \ + * --region "${REGION}" \ + * --bucket-namespace account-regional + * + * aws s3api put-bucket-versioning \ + * --bucket "${HIVE_IT_S3_BUCKET}" \ + * --versioning-configuration Status=Enabled + * + * cat > trust-policy.json < role-policy.json <3.25.0 4.0.4 4.3.0-SNAPSHOT + 2.42.25 1.9.4 1.3 5.7.1 @@ -140,6 +141,31 @@ + + software.amazon.awssdk + arns + ${aws.sdk.version} + + + software.amazon.awssdk + bundle + ${aws.sdk.version} + + + software.amazon.awssdk + iam-policy-builder + ${aws.sdk.version} + + + software.amazon.awssdk + sts + ${aws.sdk.version} + + + software.amazon.awssdk + url-connection-client + ${aws.sdk.version} + io.netty netty-all @@ -239,6 +265,11 @@ + + org.apache.hadoop + hadoop-aws + ${hadoop.version} + org.apache.hadoop hadoop-common