diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveClientCache.java b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveClientCache.java index 718e5d38a5b0..2cf98c2d8611 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveClientCache.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveClientCache.java @@ -18,7 +18,13 @@ */ package org.apache.hadoop.hive.metastore; +import java.io.Closeable; import java.io.IOException; +import java.lang.reflect.InvocationHandler; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.lang.reflect.Proxy; +import java.util.Arrays; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutionException; @@ -31,9 +37,11 @@ import org.apache.commons.lang3.builder.EqualsBuilder; import org.apache.commons.lang3.builder.HashCodeBuilder; import org.apache.hadoop.hive.common.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.annotation.NoReconnect; import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.conf.MetastoreConf; import org.apache.hadoop.hive.shims.Utils; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hive.common.util.ShutdownHookManager; @@ -45,6 +53,7 @@ import com.google.common.cache.CacheBuilder; import com.google.common.cache.RemovalListener; import com.google.common.cache.RemovalNotification; +import com.google.common.collect.ImmutableSet; import com.google.common.util.concurrent.ThreadFactoryBuilder; /** @@ -83,8 +92,15 @@ private int getThreadId() { return threadId.get(); } - public static IMetaStoreClient getNonCachedHiveMetastoreClient(HiveConf hiveConf) throws MetaException { - return RetryingMetaStoreClient.getProxy(hiveConf, true); + public static IMetaStoreClient getNonCachedHiveMetastoreClient(final HiveConf hiveConf) + throws MetaException { + String mscClassName = hiveConf.get( + MetastoreConf.ConfVars.METASTORE_CLIENT_IMPL.getHiveName(), + HiveMetaStoreClient.class.getName()); + return RetryingMetaStoreClient.getProxy(hiveConf, + new Class[]{Configuration.class, HiveMetaHookLoader.class, Boolean.class}, + new Object[]{hiveConf, null, true}, + mscClassName); } public HiveClientCache(HiveConf hiveConf) { @@ -267,7 +283,20 @@ public IMetaStoreClient get(final HiveConf hiveConf) throws MetaException, IOExc cacheableHiveMetaStoreClient.acquire(); } } - return cacheableHiveMetaStoreClient; + return (IMetaStoreClient) cacheableHiveMetaStoreClient; + } + + private static Class[] getAllInterfaces(Class... classes) { + ImmutableSet.Builder> builder = ImmutableSet.builder(); + for (Class element : classes) { + if (element.isInterface()) { + builder.add(element); + continue; + } + builder.addAll(Arrays.asList(element.getInterfaces())); + } + + return builder.build().toArray(new Class[0]); } /** @@ -284,11 +313,12 @@ private ICacheableMetaStoreClient getOrCreate(final HiveClientCacheKey cacheKey) @Override public ICacheableMetaStoreClient call() throws MetaException { // This is called from HCat, so always allow embedded metastore (as was the default). - return - (ICacheableMetaStoreClient) RetryingMetaStoreClient.getProxy(cacheKey.getHiveConf(), - new Class[]{HiveConf.class, Integer.class, Boolean.class}, - new Object[]{cacheKey.getHiveConf(), timeout, true}, - CacheableHiveMetaStoreClient.class.getName()); + IMetaStoreClient metaStoreClient = getNonCachedHiveMetastoreClient(cacheKey.getHiveConf()); + + return (ICacheableMetaStoreClient) Proxy.newProxyInstance( + HiveClientCache.class.getClassLoader(), + getAllInterfaces(IMetaStoreClient.class, CacheableHiveMetaStoreClient.class), + new CacheableHiveMetaStoreClient(metaStoreClient)); } }); } catch (ExecutionException e) { @@ -357,7 +387,7 @@ public String toString() { } @InterfaceAudience.Private - public interface ICacheableMetaStoreClient extends IMetaStoreClient { + public interface ICacheableMetaStoreClient extends Closeable { @NoReconnect void acquire(); @@ -385,17 +415,32 @@ public interface ICacheableMetaStoreClient extends IMetaStoreClient { } /** - * Add # of current users on HiveMetaStoreClient, so that the client can be cleaned when no one is using it. + * Wraps a pluggable IMetaStoreClient with caching lifecycle management via InvocationHandler proxy. + * This allows any IMetaStoreClient implementation (including Glue) to be used with the cache. */ - static class CacheableHiveMetaStoreClient extends HiveMetaStoreClient implements ICacheableMetaStoreClient { + static class CacheableHiveMetaStoreClient implements InvocationHandler, ICacheableMetaStoreClient { + private static final ImmutableSet CLOSE_METHODS; + private static final String CLOSE_METHOD_NAME = "close"; private final AtomicInteger users = new AtomicInteger(0); + private final IMetaStoreClient base; private volatile boolean expiredFromCache = false; private boolean isClosed = false; - CacheableHiveMetaStoreClient(final HiveConf conf, final Integer timeout, Boolean allowEmbedded) - throws MetaException { - super(conf, null, allowEmbedded); + static { + try { + CLOSE_METHODS = ImmutableSet.of( + AutoCloseable.class.getMethod(CLOSE_METHOD_NAME), + Closeable.class.getMethod(CLOSE_METHOD_NAME), + IMetaStoreClient.class.getMethod(CLOSE_METHOD_NAME), + ICacheableMetaStoreClient.class.getMethod(CLOSE_METHOD_NAME)); + } catch (NoSuchMethodException e) { + throw new RuntimeException(e); + } + } + + CacheableHiveMetaStoreClient(final IMetaStoreClient base) { + this.base = base; } /** @@ -456,7 +501,7 @@ public AtomicInteger getUsers() { public boolean isOpen() { try { // Look for an unlikely database name and see if either MetaException or TException is thrown - super.getDatabases("NonExistentDatabaseUsedForHealthCheck"); + base.getDatabases("NonExistentDatabaseUsedForHealthCheck"); } catch (TException e) { return false; } @@ -495,7 +540,7 @@ public synchronized void tearDownIfUnused() { public void tearDown() { try { if (!isClosed) { - super.close(); + base.close(); } isClosed = true; } catch (Exception e) { @@ -526,5 +571,21 @@ protected void finalize() throws Throwable { super.finalize(); } } + + @Override + public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { + try { + if (method.getDeclaringClass().isAssignableFrom(getClass())) { + return method.invoke(this, args); + } + if (CLOSE_METHODS.contains(method)) { + close(); + return null; + } + return method.invoke(base, args); + } catch (InvocationTargetException e) { + throw e.getCause(); + } + } } }