Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,13 @@
*/
package org.apache.hadoop.hive.metastore;

import java.io.Closeable;
import java.io.IOException;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.util.Arrays;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
Expand All @@ -31,9 +37,11 @@
import org.apache.commons.lang3.builder.EqualsBuilder;
import org.apache.commons.lang3.builder.HashCodeBuilder;
import org.apache.hadoop.hive.common.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.annotation.NoReconnect;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
import org.apache.hadoop.hive.shims.Utils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hive.common.util.ShutdownHookManager;
Expand All @@ -45,6 +53,7 @@
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.RemovalListener;
import com.google.common.cache.RemovalNotification;
import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.ThreadFactoryBuilder;

/**
Expand Down Expand Up @@ -83,8 +92,15 @@
return threadId.get();
}

public static IMetaStoreClient getNonCachedHiveMetastoreClient(HiveConf hiveConf) throws MetaException {
return RetryingMetaStoreClient.getProxy(hiveConf, true);
public static IMetaStoreClient getNonCachedHiveMetastoreClient(final HiveConf hiveConf)
throws MetaException {
String mscClassName = hiveConf.get(
MetastoreConf.ConfVars.METASTORE_CLIENT_IMPL.getHiveName(),
HiveMetaStoreClient.class.getName());
return RetryingMetaStoreClient.getProxy(hiveConf,
new Class[]{Configuration.class, HiveMetaHookLoader.class, Boolean.class},
new Object[]{hiveConf, null, true},
mscClassName);
}

public HiveClientCache(HiveConf hiveConf) {
Expand Down Expand Up @@ -267,7 +283,20 @@
cacheableHiveMetaStoreClient.acquire();
}
}
return cacheableHiveMetaStoreClient;
return (IMetaStoreClient) cacheableHiveMetaStoreClient;
}

private static Class<?>[] getAllInterfaces(Class<?>... classes) {
ImmutableSet.Builder<Class<?>> builder = ImmutableSet.builder();
for (Class<?> element : classes) {
if (element.isInterface()) {
builder.add(element);
continue;
}
builder.addAll(Arrays.asList(element.getInterfaces()));
}

return builder.build().toArray(new Class[0]);
}

/**
Expand All @@ -284,11 +313,12 @@
@Override
public ICacheableMetaStoreClient call() throws MetaException {
// This is called from HCat, so always allow embedded metastore (as was the default).
return
(ICacheableMetaStoreClient) RetryingMetaStoreClient.getProxy(cacheKey.getHiveConf(),
new Class<?>[]{HiveConf.class, Integer.class, Boolean.class},
new Object[]{cacheKey.getHiveConf(), timeout, true},
CacheableHiveMetaStoreClient.class.getName());
IMetaStoreClient metaStoreClient = getNonCachedHiveMetastoreClient(cacheKey.getHiveConf());

return (ICacheableMetaStoreClient) Proxy.newProxyInstance(
HiveClientCache.class.getClassLoader(),
getAllInterfaces(IMetaStoreClient.class, CacheableHiveMetaStoreClient.class),
new CacheableHiveMetaStoreClient(metaStoreClient));
}
});
} catch (ExecutionException e) {
Expand Down Expand Up @@ -357,7 +387,7 @@
}

@InterfaceAudience.Private
public interface ICacheableMetaStoreClient extends IMetaStoreClient {
public interface ICacheableMetaStoreClient extends Closeable {
@NoReconnect
void acquire();

Expand Down Expand Up @@ -385,17 +415,32 @@
}

/**
* Add # of current users on HiveMetaStoreClient, so that the client can be cleaned when no one is using it.
* Wraps a pluggable IMetaStoreClient with caching lifecycle management via InvocationHandler proxy.
* This allows any IMetaStoreClient implementation (including Glue) to be used with the cache.
*/
static class CacheableHiveMetaStoreClient extends HiveMetaStoreClient implements ICacheableMetaStoreClient {
static class CacheableHiveMetaStoreClient implements InvocationHandler, ICacheableMetaStoreClient {
private static final ImmutableSet<Method> CLOSE_METHODS;
private static final String CLOSE_METHOD_NAME = "close";

private final AtomicInteger users = new AtomicInteger(0);
private final IMetaStoreClient base;
private volatile boolean expiredFromCache = false;
private boolean isClosed = false;

CacheableHiveMetaStoreClient(final HiveConf conf, final Integer timeout, Boolean allowEmbedded)
throws MetaException {
super(conf, null, allowEmbedded);
static {
try {
CLOSE_METHODS = ImmutableSet.of(
AutoCloseable.class.getMethod(CLOSE_METHOD_NAME),
Closeable.class.getMethod(CLOSE_METHOD_NAME),
IMetaStoreClient.class.getMethod(CLOSE_METHOD_NAME),
ICacheableMetaStoreClient.class.getMethod(CLOSE_METHOD_NAME));
} catch (NoSuchMethodException e) {
throw new RuntimeException(e);

Check warning on line 438 in metastore/src/java/org/apache/hadoop/hive/metastore/HiveClientCache.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Replace generic exceptions with specific library exceptions or a custom exception.

See more on https://sonarcloud.io/project/issues?id=apache_hive&issues=AZ31C87VHq9gpljoeUv1&open=AZ31C87VHq9gpljoeUv1&pullRequest=6462
}
}

CacheableHiveMetaStoreClient(final IMetaStoreClient base) {
this.base = base;
}

/**
Expand Down Expand Up @@ -456,7 +501,7 @@
public boolean isOpen() {
try {
// Look for an unlikely database name and see if either MetaException or TException is thrown
super.getDatabases("NonExistentDatabaseUsedForHealthCheck");
base.getDatabases("NonExistentDatabaseUsedForHealthCheck");
} catch (TException e) {
return false;
}
Expand Down Expand Up @@ -495,7 +540,7 @@
public void tearDown() {
try {
if (!isClosed) {
super.close();
base.close();
}
isClosed = true;
} catch (Exception e) {
Expand Down Expand Up @@ -526,5 +571,21 @@
super.finalize();
}
}

@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
try {
if (method.getDeclaringClass().isAssignableFrom(getClass())) {
return method.invoke(this, args);
}
if (CLOSE_METHODS.contains(method)) {
close();
return null;
}
return method.invoke(base, args);
} catch (InvocationTargetException e) {
throw e.getCause();
}
}
}
}
Loading