diff --git a/standalone-metastore/metastore-rest-catalog/src/main/java/org/apache/iceberg/hive/MetadataLocator.java b/standalone-metastore/metastore-rest-catalog/src/main/java/org/apache/iceberg/hive/MetadataLocator.java new file mode 100644 index 000000000000..1a90111452f9 --- /dev/null +++ b/standalone-metastore/metastore-rest-catalog/src/main/java/org/apache/iceberg/hive/MetadataLocator.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.iceberg.hive; + +import org.apache.hadoop.hive.metastore.IMetaStoreClient; +import org.apache.hadoop.hive.metastore.api.GetProjectionsSpec; +import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.client.builder.GetTableProjectionsSpecBuilder; +import org.apache.iceberg.BaseMetastoreTableOperations; +import org.apache.iceberg.ClientPool; +import org.apache.iceberg.MetadataTableType; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.exceptions.NoSuchTableException; +import org.apache.thrift.TException; + +import java.util.Collections; +import java.util.List; + +/** + * Fetches the location of a given metadata table. + *

Since the location mutates with each transaction, this allows determining if a cached version of the + * table is the latest known in the HMS database.

+ */ +public class MetadataLocator { + private static final org.slf4j.Logger LOGGER = org.slf4j.LoggerFactory.getLogger(MetadataLocator.class); + private static final GetProjectionsSpec PARAM_SPEC = new GetTableProjectionsSpecBuilder() + .includeParameters() // only fetches table.parameters + .build(); + private final HiveCatalog catalog; + + public MetadataLocator(HiveCatalog catalog) { + this.catalog = catalog; + } + + public HiveCatalog getCatalog() { + return catalog; + } + + /** + * Returns the location of the metadata table identified by the given identifier, or null if the table is + * not a metadata table. + *

This uses the Thrift API to fetch the table parameters, which is more efficient than fetching the entire table object.

+ * @param identifier the identifier of the metadata table to fetch the location for + * @return the location of the metadata table, or null if the table does not exist or is not a metadata table + * @throws NoSuchTableException if the table does not exist + */ + public String getLocation(TableIdentifier identifier) { + final ClientPool clients = catalog.clientPool(); + final String catName = catalog.name(); + final TableIdentifier baseTableIdentifier; + if (!catalog.isValidIdentifier(identifier)) { + if (!isValidMetadataIdentifier(identifier)) { + return null; + } else { + baseTableIdentifier = TableIdentifier.of(identifier.namespace().levels()); + } + } else { + baseTableIdentifier = identifier; + } + String database = baseTableIdentifier.namespace().level(0); + String tableName = baseTableIdentifier.name(); + try { + List tables = clients.run( + client -> client.getTables(catName, database, Collections.singletonList(tableName), PARAM_SPEC) + ); + return tables == null || tables.isEmpty() + ? null + : tables.getFirst().getParameters().get(BaseMetastoreTableOperations.METADATA_LOCATION_PROP); + } catch (NoSuchTableException e) { + LOGGER.debug("Table {} not found: {}", baseTableIdentifier, e.getMessage()); + throw e; + } catch (NoSuchObjectException e) { + throw new NoSuchTableException("Table %s not found: %s", baseTableIdentifier, e.getMessage()); + } catch (TException e) { + LOGGER.info("Table {} parameters fetch failed: {}", baseTableIdentifier, e.getMessage()); + throw new RuntimeException("Failed to fetch table parameters for " + baseTableIdentifier, e); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Interrupted while fetching table parameters for " + baseTableIdentifier, e); + } + } + + private boolean isValidMetadataIdentifier(TableIdentifier identifier) { + return MetadataTableType.from(identifier.name()) != null + && catalog.isValidIdentifier(TableIdentifier.of(identifier.namespace().levels())); + } +} diff --git a/standalone-metastore/metastore-rest-catalog/src/main/java/org/apache/iceberg/rest/HMSCachingCatalog.java b/standalone-metastore/metastore-rest-catalog/src/main/java/org/apache/iceberg/rest/HMSCachingCatalog.java index edb5fbd41a9b..8e1973894b26 100644 --- a/standalone-metastore/metastore-rest-catalog/src/main/java/org/apache/iceberg/rest/HMSCachingCatalog.java +++ b/standalone-metastore/metastore-rest-catalog/src/main/java/org/apache/iceberg/rest/HMSCachingCatalog.java @@ -7,24 +7,38 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.iceberg.rest; import com.github.benmanes.caffeine.cache.Ticker; + +import java.lang.ref.SoftReference; +import java.util.Collections; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Function; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.iceberg.BaseMetadataTable; import org.apache.iceberg.CachingCatalog; +import org.apache.iceberg.HasTableOperations; +import org.apache.iceberg.MetadataTableType; +import org.apache.iceberg.MetadataTableUtils; import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableOperations; import org.apache.iceberg.catalog.Catalog; import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.catalog.SupportsNamespaces; @@ -33,58 +47,310 @@ import org.apache.iceberg.exceptions.NamespaceNotEmptyException; import org.apache.iceberg.exceptions.NoSuchNamespaceException; import org.apache.iceberg.hive.HiveCatalog; +import org.apache.iceberg.hive.MetadataLocator; import org.apache.iceberg.view.View; import org.apache.iceberg.view.ViewBuilder; - +import org.jetbrains.annotations.TestOnly; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Class that wraps an Iceberg Catalog to cache tables. */ public class HMSCachingCatalog extends CachingCatalog implements SupportsNamespaces, ViewCatalog { + protected static final Logger LOG = LoggerFactory.getLogger(HMSCachingCatalog.class); + + @TestOnly + private static SoftReference cacheRef = new SoftReference<>(null); + + @TestOnly @SuppressWarnings("unchecked") + public static C getLatestCache(Function extractor) { + HMSCachingCatalog cache = cacheRef.get(); + if (cache == null) { + return null; + } + return extractor == null ? (C) cache : extractor.apply(cache); + } + + @TestOnly + public HiveCatalog getCatalog() { + return hiveCatalog; + } + + // The underlying HiveCatalog instance. private final HiveCatalog hiveCatalog; - - public HMSCachingCatalog(HiveCatalog catalog, long expiration) { - super(catalog, true, expiration, Ticker.systemTicker()); + // Duplicate because CachingCatalog doesn't expose the case sensitivity of the underlying catalog, + // which is needed for canonicalizing identifiers before caching. + private final boolean caseSensitive; + // The locator. + private final MetadataLocator metadataLocator; + // An L1 small latency cache. + // This is used to cache the last cached time for each table identifier, + // so that we can skip location check for repeated access to the same table within a short period of time, + // which can significantly reduce the latency for repeated access to the same table. + private final Map l1Cache; + // The TTL for L1 cache (3s). + private final int l1Ttl; + // The L1 cache size. + private final int l1CacheSize; + + // Metrics counters. + private final AtomicLong cacheHitCount = new AtomicLong(0); + private final AtomicLong cacheMissCount = new AtomicLong(0); + private final AtomicLong cacheLoadCount = new AtomicLong(0); + private final AtomicLong cacheInvalidateCount = new AtomicLong(0); + private final AtomicLong cacheMetaLoadCount = new AtomicLong(0); + + public HMSCachingCatalog(HiveCatalog catalog, long expirationMs) { + this(catalog, expirationMs, /*caseSensitive*/ true); + } + + public HMSCachingCatalog(HiveCatalog catalog, long expirationMs, boolean caseSensitive) { + super(catalog, caseSensitive, expirationMs, Ticker.systemTicker()); this.hiveCatalog = catalog; + this.caseSensitive = caseSensitive; + this.metadataLocator = new MetadataLocator(catalog); + Configuration conf = catalog.getConf(); + if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_IN_TEST)) { + // Only keep a reference to the latest cache for testing purpose, so that tests can manipulate the catalog. + cacheRef = new SoftReference<>(this); + } + int l1size = conf.getInt("hms.caching.catalog.l1.cache.size", 32); + int l1ttl = conf.getInt("hms.caching.catalog.l1.cache.ttl", 3_000); + if (l1size > 0 && l1ttl > 0) { + l1Cache = Collections.synchronizedMap(new LinkedHashMap() { + @Override + protected boolean removeEldestEntry(Map.Entry eldest) { + return size() > l1CacheSize; + } + }); + l1Ttl = l1ttl; + l1CacheSize = l1size; + } else { + l1Cache = Collections.emptyMap(); + l1Ttl = 0; + l1CacheSize = 0; + } + } + + /** + * Callback when cache invalidates the entry for a given table identifier. + * + * @param tid the table identifier to invalidate + */ + protected void onCacheInvalidate(TableIdentifier tid) { + long count = cacheInvalidateCount.incrementAndGet(); + LOG.debug("Cache invalidate {}: {}", tid, count); + } + + /** + * Callback when cache loads a table for a given table identifier. + * + * @param tid the table identifier + */ + protected void onCacheLoad(TableIdentifier tid) { + long count = cacheLoadCount.incrementAndGet(); + LOG.debug("Cache load {}: {}", tid, count); + } + + /** + * Callback when cache hit for a given table identifier. + * + * @param tid the table identifier + */ + protected void onCacheHit(TableIdentifier tid) { + long count = cacheHitCount.incrementAndGet(); + LOG.debug("Cache hit {} : {}", tid, count); } + /** + * Callback when cache miss occurs for a given table identifier. + * + * @param tid the table identifier + */ + protected void onCacheMiss(TableIdentifier tid) { + long count = cacheMissCount.incrementAndGet(); + LOG.debug("Cache miss {}: {}", tid, count); + } + + /** + * Callback when cache loads a metadata table for a given table identifier. + * + * @param tid the table identifier + */ + protected void onCacheMetaLoad(TableIdentifier tid) { + long count = cacheMetaLoadCount.incrementAndGet(); + LOG.debug("Cache meta-load {}: {}", tid, count); + } + + // Getter methods for accessing metrics + public long getCacheHitCount() { + return cacheHitCount.get(); + } + + public long getCacheMissCount() { + return cacheMissCount.get(); + } + + public long getCacheLoadCount() { + return cacheLoadCount.get(); + } + + public long getCacheInvalidateCount() { + return cacheInvalidateCount.get(); + } + + public long getCacheMetaLoadCount() { + return cacheMetaLoadCount.get(); + } + + public double getCacheHitRate() { + long hits = cacheHitCount.get(); + long total = hits + cacheMissCount.get(); + return total == 0 ? 0.0 : (double) hits / total; + } + + /** + * Generates a map of this cache's performance metrics, including hit count, + * miss count, load count, invalidate count, meta-load count, and hit rate. + * This can be used for monitoring and debugging purposes to understand the effectiveness of the cache. + * @return a map of cache performance metrics + */ + public Map cacheStats() { + return Map.of( + "hit", getCacheHitCount(), + "miss", getCacheMissCount(), + "load", getCacheLoadCount(), + "invalidate", getCacheInvalidateCount(), + "metaload", getCacheMetaLoadCount(), + "hit-rate", getCacheHitRate() + ); + } + + @Override - public Catalog.TableBuilder buildTable(TableIdentifier identifier, Schema schema) { - return hiveCatalog.buildTable(identifier, schema); + public void createNamespace(Namespace namespace, Map map) { + hiveCatalog.createNamespace(namespace, map); } @Override - public void createNamespace(Namespace nmspc, Map map) { - hiveCatalog.createNamespace(nmspc, map); + public List listNamespaces(Namespace namespace) throws NoSuchNamespaceException { + return hiveCatalog.listNamespaces(namespace); + } + + /** + * Canonicalizes the given table identifier based on the case sensitivity of the underlying catalog. + * Copied from CachingCatalog that exposes it as private. + * @param tableIdentifier the table identifier to canonicalize + * @return the canonicalized table identifier + */ + private TableIdentifier canonicalizeIdentifier(TableIdentifier tableIdentifier) { + return this.caseSensitive ? tableIdentifier : tableIdentifier.toLowerCase(); } @Override - public List listNamespaces(Namespace nmspc) throws NoSuchNamespaceException { - return hiveCatalog.listNamespaces(nmspc); + public void invalidateTable(TableIdentifier ident) { + super.invalidateTable(ident); + l1Cache.remove(ident); + } + + @Override + public Table loadTable(final TableIdentifier identifier) { + final TableIdentifier canonicalized = canonicalizeIdentifier(identifier); + final Table cachedTable = tableCache.getIfPresent(canonicalized); + long now = System.currentTimeMillis(); + if (cachedTable != null) { + // Determine if L1 cache is valid based on the last cached time and the TTL. + // If the table is in L1 cache, we can skip the location check and return the cached table directly, + // which can significantly reduce the latency for repeated access to the same table. + Long lastCached = l1Cache.get(canonicalized); + if (lastCached != null) { + if (now - lastCached < l1Ttl) { + LOG.debug("Table {} is in L1 cache, returning cached table", canonicalized); + onCacheHit(canonicalized); + return cachedTable; + } else { + l1Cache.remove(canonicalized); + } + } + // If the table is no longer in L1 cache, we need to check the location. + final String location = metadataLocator.getLocation(canonicalized); + if (location == null) { + LOG.debug("Table {} has no location, returning cached table without location", canonicalized); + onCacheHit(canonicalized); + l1Cache.put(canonicalized, now); + return cachedTable; + } + String cachedLocation = cachedTable instanceof HasTableOperations tableOps + ? tableOps.operations().current().metadataFileLocation() + : null; + if (location.equals(cachedLocation)) { + onCacheHit(canonicalized); + l1Cache.put(canonicalized, now); + return cachedTable; + } else { + LOG.debug("Invalidate table {}, cached {} != actual {}", canonicalized, cachedLocation, location); + // Invalidate the cached table if the location is different + invalidateTable(canonicalized); + onCacheInvalidate(canonicalized); + } + } else { + onCacheMiss(canonicalized); + } + // The following code is copied from CachingCatalog.loadTable(), but with additional handling for L1 cache and stats. + final Table table = tableCache.get(canonicalized, this::loadTableWithoutCache); + if (table instanceof BaseMetadataTable) { + // Cache underlying table: there must be a table named by the namespace (?) + TableIdentifier originTableIdentifier = TableIdentifier.of(canonicalized.namespace().levels()); + Table originTable = tableCache.get(originTableIdentifier, this::loadTableWithoutCache); + // Share TableOperations instance of origin table for all metadata tables, so that metadata + // table instances are refreshed as well when origin table instance is refreshed. + if (originTable instanceof HasTableOperations tableOps) { + TableOperations ops = tableOps.operations(); + MetadataTableType type = MetadataTableType.from(canonicalized.name()); + // Defensive: CachingCatalog doesn't perform this check + if (type != null) { + Table metadataTable = MetadataTableUtils.createMetadataTableInstance(ops, hiveCatalog.name(), originTableIdentifier, canonicalized, type); + tableCache.put(canonicalized, metadataTable); + l1Cache.put(canonicalized, now); + onCacheMetaLoad(canonicalized); + LOG.debug("Loaded metadata table: {} for origin table: {}", canonicalized, originTableIdentifier); + // Return the metadata table instead of the original table + return metadataTable; + } + } + } + l1Cache.put(canonicalized, now); + onCacheLoad(canonicalized); + return table; + } + + private Table loadTableWithoutCache(TableIdentifier identifier) { + return hiveCatalog.loadTable(identifier); } @Override - public Map loadNamespaceMetadata(Namespace nmspc) throws NoSuchNamespaceException { - return hiveCatalog.loadNamespaceMetadata(nmspc); + public Map loadNamespaceMetadata(Namespace namespace) throws NoSuchNamespaceException { + return hiveCatalog.loadNamespaceMetadata(namespace); } @Override - public boolean dropNamespace(Namespace nmspc) throws NamespaceNotEmptyException { - List tables = listTables(nmspc); + public boolean dropNamespace(Namespace namespace) throws NamespaceNotEmptyException { + List tables = listTables(namespace); for (TableIdentifier ident : tables) { invalidateTable(ident); } - return hiveCatalog.dropNamespace(nmspc); + return hiveCatalog.dropNamespace(namespace); } @Override - public boolean setProperties(Namespace nmspc, Map map) throws NoSuchNamespaceException { - return hiveCatalog.setProperties(nmspc, map); + public boolean setProperties(Namespace namespace, Map map) throws NoSuchNamespaceException { + return hiveCatalog.setProperties(namespace, map); } @Override - public boolean removeProperties(Namespace nmspc, Set set) throws NoSuchNamespaceException { - return hiveCatalog.removeProperties(nmspc, set); + public boolean removeProperties(Namespace namespace, Set set) throws NoSuchNamespaceException { + return hiveCatalog.removeProperties(namespace, set); } @Override @@ -92,6 +358,11 @@ public boolean namespaceExists(Namespace namespace) { return hiveCatalog.namespaceExists(namespace); } + @Override + public Catalog.TableBuilder buildTable(TableIdentifier identifier, Schema schema) { + return hiveCatalog.buildTable(identifier, schema); + } + @Override public List listViews(Namespace namespace) { return hiveCatalog.listViews(namespace); diff --git a/standalone-metastore/metastore-rest-catalog/src/main/java/org/apache/iceberg/rest/HMSCatalogAdapter.java b/standalone-metastore/metastore-rest-catalog/src/main/java/org/apache/iceberg/rest/HMSCatalogAdapter.java index 8a9ea142d72d..855e260c52a3 100644 --- a/standalone-metastore/metastore-rest-catalog/src/main/java/org/apache/iceberg/rest/HMSCatalogAdapter.java +++ b/standalone-metastore/metastore-rest-catalog/src/main/java/org/apache/iceberg/rest/HMSCatalogAdapter.java @@ -7,14 +7,13 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.iceberg.rest; @@ -25,6 +24,7 @@ import java.io.IOException; import java.time.Clock; import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.Map; import javax.servlet.http.HttpServletResponse; @@ -73,6 +73,7 @@ import org.apache.iceberg.rest.responses.ListTablesResponse; import org.apache.iceberg.rest.responses.LoadTableResponse; import org.apache.iceberg.rest.responses.LoadViewResponse; +import org.apache.iceberg.rest.responses.HMSCacheStatsResponse; import org.apache.iceberg.rest.responses.UpdateNamespacePropertiesResponse; import org.apache.iceberg.util.Pair; import org.apache.iceberg.util.PropertyUtil; @@ -83,9 +84,11 @@ * Original @ RESTCatalogAdapter.java * Adaptor class to translate REST requests into {@link Catalog} API calls. */ + public class HMSCatalogAdapter implements Closeable { private static final Logger LOG = LoggerFactory.getLogger(HMSCatalogAdapter.class); private static final Splitter SLASH = Splitter.on('/'); + private static final String V1_CACHE_STATS = "v1/cache/stats"; private static final Map, Integer> EXCEPTION_ERROR_CODES = ImmutableMap., Integer>builder() @@ -147,7 +150,8 @@ enum Route { CREATE_VIEW(HTTPMethod.POST, ResourcePaths.V1_VIEWS, CreateViewRequest.class), UPDATE_VIEW(HTTPMethod.POST, ResourcePaths.V1_VIEW, UpdateTableRequest.class), RENAME_VIEW(HTTPMethod.POST, ResourcePaths.V1_VIEW_RENAME, RenameTableRequest.class), - DROP_VIEW(HTTPMethod.DELETE, ResourcePaths.V1_VIEW); + DROP_VIEW(HTTPMethod.DELETE, ResourcePaths.V1_VIEW), + CACHE_STATS(HTTPMethod.GET, V1_CACHE_STATS); private final HTTPMethod method; private final int requiredLength; @@ -221,6 +225,14 @@ public Class requestClass() { } } + private HMSCacheStatsResponse cacheStats() { + Map stats = Collections.emptyMap(); + if (catalog instanceof HMSCachingCatalog hmsCatalog) { + stats = hmsCatalog.cacheStats(); + } + return castResponse(HMSCacheStatsResponse.class, new HMSCacheStatsResponse(stats)); + } + private ConfigResponse config() { final List endpoints = Arrays.stream(Route.values()) .map(r -> Endpoint.create(r.method.name(), r.resourcePath)).toList(); @@ -400,10 +412,9 @@ private static void commitTransaction(Catalog catalog, CommitTransactionRequest for (UpdateTableRequest tableChange : request.tableChanges()) { Table table = catalog.loadTable(tableChange.identifier()); - if (table instanceof BaseTable) { - Transaction transaction = - Transactions.newTransaction( - tableChange.identifier().toString(), ((BaseTable) table).operations()); + if (table instanceof BaseTable baseTable) { + Transaction transaction = Transactions.newTransaction( + tableChange.identifier().toString(), baseTable.operations()); transactions.add(transaction); BaseTransaction.TransactionTable txTable = @@ -420,87 +431,36 @@ private static void commitTransaction(Catalog catalog, CommitTransactionRequest } @SuppressWarnings({"MethodLength", "unchecked"}) - private T handleRequest( - Route route, Map vars, Object body) { - switch (route) { - case CONFIG: - return (T) config(); - - case LIST_NAMESPACES: - return (T) listNamespaces(vars); - - case CREATE_NAMESPACE: - return (T) createNamespace(body); - - case NAMESPACE_EXISTS: - return (T) namespaceExists(vars); - - case LOAD_NAMESPACE: - return (T) loadNamespace(vars); - - case DROP_NAMESPACE: - return (T) dropNamespace(vars); - - case UPDATE_NAMESPACE: - return (T) updateNamespace(vars, body); - - case LIST_TABLES: - return (T) listTables(vars); - - case CREATE_TABLE: - return (T) createTable(vars, body); - - case DROP_TABLE: - return (T) dropTable(vars); - - case TABLE_EXISTS: - return (T) tableExists(vars); - - case LOAD_TABLE: - return (T) loadTable(vars); - - case REGISTER_TABLE: - return (T) registerTable(vars, body); - - case UPDATE_TABLE: - return (T) updateTable(vars, body); - - case RENAME_TABLE: - return (T) renameTable(body); - - case REPORT_METRICS: - return (T) reportMetrics(vars, body); - - case COMMIT_TRANSACTION: - return (T) commitTransaction(body); - - case LIST_VIEWS: - return (T) listViews(vars); - - case CREATE_VIEW: - return (T) createView(vars, body); - - case VIEW_EXISTS: - return (T) viewExists(vars); - - case LOAD_VIEW: - return (T) loadView(vars); - - case UPDATE_VIEW: - return (T) updateView(vars, body); - - case RENAME_VIEW: - return (T) renameView(body); - - case DROP_VIEW: - return (T) dropView(vars); - - default: - } - return null; + private T handleRequest(Route route, Map vars, Object body) { + return switch (route) { + case CONFIG -> (T) config(); + case LIST_NAMESPACES -> (T) listNamespaces(vars); + case CREATE_NAMESPACE -> (T) createNamespace(body); + case NAMESPACE_EXISTS -> (T) namespaceExists(vars); + case LOAD_NAMESPACE -> (T) loadNamespace(vars); + case DROP_NAMESPACE -> (T) dropNamespace(vars); + case UPDATE_NAMESPACE -> (T) updateNamespace(vars, body); + case LIST_TABLES -> (T) listTables(vars); + case CREATE_TABLE -> (T) createTable(vars, body); + case DROP_TABLE -> (T) dropTable(vars); + case TABLE_EXISTS -> (T) tableExists(vars); + case LOAD_TABLE -> (T) loadTable(vars); + case REGISTER_TABLE -> (T) registerTable(vars, body); + case UPDATE_TABLE -> (T) updateTable(vars, body); + case RENAME_TABLE -> (T) renameTable(body); + case REPORT_METRICS -> (T) reportMetrics(vars, body); + case COMMIT_TRANSACTION -> (T) commitTransaction(body); + case LIST_VIEWS -> (T) listViews(vars); + case CREATE_VIEW -> (T) createView(vars, body); + case VIEW_EXISTS -> (T) viewExists(vars); + case LOAD_VIEW -> (T) loadView(vars); + case UPDATE_VIEW -> (T) updateView(vars, body); + case RENAME_VIEW -> (T) renameView(body); + case DROP_VIEW -> (T) dropView(vars); + case CACHE_STATS -> (T) cacheStats(); + }; } - T execute( HTTPMethod method, String path, diff --git a/standalone-metastore/metastore-rest-catalog/src/main/java/org/apache/iceberg/rest/HMSCatalogFactory.java b/standalone-metastore/metastore-rest-catalog/src/main/java/org/apache/iceberg/rest/HMSCatalogFactory.java index d7bd3251a0c7..b8a0e6f6ea2b 100644 --- a/standalone-metastore/metastore-rest-catalog/src/main/java/org/apache/iceberg/rest/HMSCatalogFactory.java +++ b/standalone-metastore/metastore-rest-catalog/src/main/java/org/apache/iceberg/rest/HMSCatalogFactory.java @@ -7,15 +7,15 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ + package org.apache.iceberg.rest; import java.lang.reflect.InvocationTargetException; diff --git a/standalone-metastore/metastore-rest-catalog/src/main/java/org/apache/iceberg/rest/HMSCatalogServlet.java b/standalone-metastore/metastore-rest-catalog/src/main/java/org/apache/iceberg/rest/HMSCatalogServlet.java index 21b155d65d8d..042a4ae2b7e3 100644 --- a/standalone-metastore/metastore-rest-catalog/src/main/java/org/apache/iceberg/rest/HMSCatalogServlet.java +++ b/standalone-metastore/metastore-rest-catalog/src/main/java/org/apache/iceberg/rest/HMSCatalogServlet.java @@ -7,14 +7,13 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.iceberg.rest; @@ -29,6 +28,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.rest.HMSCatalogAdapter.Route; import org.apache.iceberg.rest.HTTPRequest.HTTPMethod; +import org.apache.iceberg.exceptions.RESTException; import org.apache.iceberg.rest.responses.ErrorResponse; import org.apache.iceberg.util.Pair; import org.slf4j.Logger; @@ -80,6 +80,11 @@ protected void service(HttpServletRequest request, HttpServletResponse response) if (responseBody != null) { RESTObjectMapper.mapper().writeValue(response.getWriter(), responseBody); } + } catch (RESTException e) { + // A RESTException is thrown by HMSCatalogAdapter.execute() after the error handler has + // already written the correct HTTP status and body to the response (e.g. 404, 403). + // It is not an unexpected server failure, so log at DEBUG to avoid flooding the console. + LOG.debug("REST request resulted in a client error (already handled): {}", e.getMessage()); } catch (RuntimeException | IOException e) { LOG.error("Error processing REST request", e); response.setStatus(HttpServletResponse.SC_INTERNAL_SERVER_ERROR); diff --git a/standalone-metastore/metastore-rest-catalog/src/main/java/org/apache/iceberg/rest/responses/HMSCacheStatsResponse.java b/standalone-metastore/metastore-rest-catalog/src/main/java/org/apache/iceberg/rest/responses/HMSCacheStatsResponse.java new file mode 100644 index 000000000000..f8614d0771f7 --- /dev/null +++ b/standalone-metastore/metastore-rest-catalog/src/main/java/org/apache/iceberg/rest/responses/HMSCacheStatsResponse.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.iceberg.rest.responses; + +import org.apache.iceberg.rest.RESTResponse; + +import java.util.Collections; +import java.util.Map; +import java.util.TreeMap; + +public record HMSCacheStatsResponse(Map stats) implements RESTResponse { + public HMSCacheStatsResponse(Map stats) { + this.stats = stats == null || stats.isEmpty() + ? Collections.emptyMap() + : Collections.unmodifiableMap(new TreeMap<>(stats)); + } + + @Override + public void validate() { + // nothing + } +} diff --git a/standalone-metastore/metastore-rest-catalog/src/test/java/org/apache/iceberg/rest/TestHMSCachingCatalogStats.java b/standalone-metastore/metastore-rest-catalog/src/test/java/org/apache/iceberg/rest/TestHMSCachingCatalogStats.java new file mode 100644 index 000000000000..471b9e405fd3 --- /dev/null +++ b/standalone-metastore/metastore-rest-catalog/src/test/java/org/apache/iceberg/rest/TestHMSCachingCatalogStats.java @@ -0,0 +1,199 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.rest; + +import com.fasterxml.jackson.databind.ObjectMapper; +import java.net.URI; +import java.net.http.HttpClient; +import java.net.http.HttpRequest; +import java.net.http.HttpResponse; +import java.util.Map; + +import org.apache.hadoop.hive.metastore.ServletSecurity.AuthType; +import org.apache.hadoop.hive.metastore.annotation.MetastoreCheckinTest; +import org.apache.hadoop.hive.metastore.conf.MetastoreConf; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DataFiles; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.hive.HiveCatalog; +import org.apache.iceberg.rest.extension.HiveRESTCatalogServerExtension; +import org.apache.iceberg.rest.responses.HMSCacheStatsResponse; +import org.junit.experimental.categories.Category; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInstance; +import org.junit.jupiter.api.extension.RegisterExtension; + +/** + * Integration tests that verify the {@link HMSCachingCatalog} cache-statistics counters + * (hit, miss, load, hit-rate) are updated correctly and exposed accurately via the + * {@code GET v1/cache/stats} REST endpoint. + * + *

The server is started with {@link AuthType#NONE} so the tests focus purely on + * caching behaviour without any authentication noise. + */ +@Category(MetastoreCheckinTest.class) +@TestInstance(TestInstance.Lifecycle.PER_CLASS) +class TestHMSCachingCatalogStats { + + /** 5 minutes expressed in milliseconds – the value injected into {@code ICEBERG_CATALOG_CACHE_EXPIRY}. */ + private static final long CACHE_EXPIRY_MS = 5 * 60 * 1_000L; + + @RegisterExtension + private static final HiveRESTCatalogServerExtension REST_CATALOG_EXTENSION = + HiveRESTCatalogServerExtension.builder(AuthType.NONE) + // Without a positive expiry the HMSCatalogFactory skips HMSCachingCatalog entirely. + .configure( + MetastoreConf.ConfVars.ICEBERG_CATALOG_CACHE_EXPIRY.getVarname(), + String.valueOf(CACHE_EXPIRY_MS)) + .configure("hive.in.test", "true") + .build(); + + private RESTCatalog catalog; + private HiveCatalog serverCatalog; + + @BeforeAll + void setupAll() { + catalog = RCKUtils.initCatalogClient(clientConfig()); + serverCatalog = HMSCachingCatalog.getLatestCache(HMSCachingCatalog::getCatalog); + Assertions.assertNotNull(serverCatalog, "Expected HMSCachingCatalog to be initialized"); + } + + /** Remove any namespace/table created by the test so each run starts clean. */ + @AfterEach + void cleanup() { + RCKUtils.purgeCatalogTestEntries(catalog); + } + + // --------------------------------------------------------------------------- + // helpers + // --------------------------------------------------------------------------- + + private java.util.Map clientConfig() { + return java.util.Map.of("uri", REST_CATALOG_EXTENSION.getRestEndpoint()); + } + + /** + * Calls the {@code GET v1/cache/stats} endpoint directly over HTTP and returns + * the deserialised {@link HMSCacheStatsResponse}. + */ + private HMSCacheStatsResponse fetchCacheStats() throws Exception { + String statsUrl = REST_CATALOG_EXTENSION.getRestEndpoint() + "/v1/cache/stats"; + HttpRequest request = HttpRequest.newBuilder() + .uri(URI.create(statsUrl)) + .GET() + .build(); + HttpResponse response; + try (HttpClient client = HttpClient.newHttpClient()) { + response = client.send(request, HttpResponse.BodyHandlers.ofString()); + } + Assertions.assertEquals(200, response.statusCode(), + "Expected HTTP 200 from cache stats endpoint, got: " + response.statusCode()); + return new ObjectMapper().readValue(response.body(), HMSCacheStatsResponse.class); + } + + + /** + * Verifies that the {@link HMSCachingCatalog} correctly tracks cache hits, misses and + * loads, and that those counters are accurately returned via the REST endpoint. + * + *

Strategy: + *

    + *
  1. Snapshot baseline stats before any operations so the test is isolated from + * cumulative counters left by previous tests.
  2. + *
  3. Create a namespace and a table (bypasses the cache – done via + * {@link org.apache.iceberg.hive.HiveCatalog} directly).
  4. + *
  5. First {@code loadTable} call → cache miss + actual load.
  6. + *
  7. Second and third {@code loadTable} calls → cache hits (metadata location + * has not changed, so the cached entry is still valid).
  8. + *
  9. Fetch stats again and assert the deltas against the baseline.
  10. + *
+ */ + @Test + void testCacheCountersAreUpdated() throws Exception { + // -- baseline --------------------------------------------------------------- + Map baseline = fetchCacheStats().stats(); + long baseHit = baseline.getOrDefault("hit", 0L).longValue(); + long baseMiss = baseline.getOrDefault("miss", 0L).longValue(); + long baseLoad = baseline.getOrDefault("load", 0L).longValue(); + + // -- exercise the cache ----------------------------------------------------- + var db = Namespace.of("caching_stats_test_db"); + var tableId = TableIdentifier.of(db, "caching_stats_test_table"); + + catalog.createNamespace(db); + catalog.createTable(tableId, new Schema()); + + // First load → cache miss + load + catalog.loadTable(tableId); + // Second load → cache hit (metadata location unchanged) + catalog.loadTable(tableId); + // Third load → cache hit + catalog.loadTable(tableId); + + // Mutate the table by appending a data file – this creates a new snapshot + // which advances METADATA_LOCATION in HMS, so the next loadTable call through + // the caching catalog will detect the stale cached location and invalidate it. + Table table = serverCatalog.loadTable(tableId); + DataFile dataFile = DataFiles.builder(PartitionSpec.unpartitioned()) + .withPath(table.location() + "/data/fake-0.parquet") + .withFileSizeInBytes(1024) + .withRecordCount(1) + .build(); + table.newAppend() + .appendFile(dataFile) + .commit(); + + long baseInvalidate = fetchCacheStats().stats().getOrDefault("invalidate", 0L).longValue(); + // the L1 cache has a 3 seconds default delay before it considers entries stale + Thread.sleep(3_000); + // Fourth load → cache invalidation + load (cached location != HMS location) + catalog.loadTable(tableId); + + // -- fetch updated stats via the REST endpoint ------------------------------ + Map after = fetchCacheStats().stats(); + long deltaHit = after.getOrDefault("hit", 0L).longValue() - baseHit; + long deltaMiss = after.getOrDefault("miss", 0L).longValue() - baseMiss; + long deltaLoad = after.getOrDefault("load", 0L).longValue() - baseLoad; + long deltaInvalidate = after.getOrDefault("invalidate", 0L).longValue() - baseInvalidate; + + // -- assertions ------------------------------------------------------------- + Assertions.assertTrue(deltaMiss >= 1, + "Expected at least 1 cache miss (first loadTable), but delta was: " + deltaMiss); + Assertions.assertTrue(deltaLoad >= 2, + "Expected at least 2 cache loads (initial load + post-invalidation reload), but delta was: " + deltaLoad); + Assertions.assertTrue(deltaHit >= 2, + "Expected at least 2 cache hits (second + third loadTable), but delta was: " + deltaHit); + Assertions.assertTrue(deltaInvalidate >= 1, + "Expected at least 1 cache invalidation (metadata location changed after table update), but delta was: " + deltaInvalidate); + + // hit-rate must be a valid ratio in [0.0, 1.0] + double hitRate = after.getOrDefault("hit-rate", 0.0).doubleValue(); + Assertions.assertTrue(hitRate > 0.0 && hitRate <= 1.0, + "hit-rate must be in [0.0, 1.0] but was: " + hitRate); + } +} +