Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -105,4 +105,34 @@ public class PhoenixStorageHandlerConstants {
FUNCTION_VALUE_MARKER;

public static final IntWritable INT_ZERO = new IntWritable(0);

/**
* Minimum number of parallel scans(Nps) threshold (Nt) required to trigger parallel split
* generation method (PSGM),instead of serial split generation method (SSGM).
* According to test, SSGM is better when Nps is less than Nt, when Nps is larger than Nt,
* PSGM will be better.
* Note: It is strongly recommend to leave the setting as default,tuning the value doesn't
* make much difference.If you insist on using legacy method(SSGM),set
* phoenix.minimum.parallel.scans.threshold = 0 .
*/
public static final String PHOENIX_MINIMUM_PARALLEL_SCANS_THRESHOLD =
"phoenix.minimum.parallel.scans.threshold";
/**
* Default minimum number of parallel scans threshold,value is acquired by local testing.
*/
public static final int DEFAULT_PHOENIX_MINIMUM_PARALLEL_SCANS_THRESHOLD = 8;

/**
* Number of worker threads used to generate input splits using PSGM.
* Note: default setting is suitable for most use cases,
* you can set it to bigger value properly to get better performance.
*/
public static final String PHOENIX_INPUTSPLIT_GENERATION_THREAD_COUNT =
"phoenix.inputsplit.generation.thread.count";
/**
* Default worker threads used to generate input splits using PSGM.
*/
public static final int DEFAULT_PHOENIX_INPUTSPLIT_GENERATION_THREAD_COUNT =
Runtime.getRuntime().availableProcessors() * 2;

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,12 @@
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HConstants;
Expand All @@ -48,6 +54,7 @@
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.db.DBWritable;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.phoenix.compat.CompatUtil;
import org.apache.phoenix.compile.QueryPlan;
import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
import org.apache.phoenix.hive.constants.PhoenixStorageHandlerConstants;
Expand All @@ -61,7 +68,6 @@
import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
import org.apache.phoenix.query.KeyRange;
import org.apache.phoenix.util.PhoenixRuntime;
import org.apache.phoenix.compat.CompatUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -73,7 +79,6 @@ public class PhoenixInputFormat<T extends DBWritable> implements InputFormat<Wri
T> {

private static final Logger LOG = LoggerFactory.getLogger(PhoenixInputFormat.class);

public PhoenixInputFormat() {
if (LOG.isDebugEnabled()) {
LOG.debug("PhoenixInputFormat created");
Expand Down Expand Up @@ -119,74 +124,147 @@ public InputSplit[] getSplits(JobConf jobConf, int numSplits) throws IOException
}

private List<InputSplit> generateSplits(final JobConf jobConf, final QueryPlan qplan,
final List<KeyRange> splits, String query) throws
IOException {
if (qplan == null){
final List<KeyRange> splits, final String query)
throws IOException {

if (qplan == null) {
throw new NullPointerException();
}if (splits == null){
}
if (splits == null) {
throw new NullPointerException();
}
final List<InputSplit> psplits = new ArrayList<>(splits.size());

Path[] tablePaths = FileInputFormat.getInputPaths(ShimLoader.getHadoopShims()
.newJobContext(new Job(jobConf)));
boolean splitByStats = jobConf.getBoolean(PhoenixStorageHandlerConstants.SPLIT_BY_STATS,
final Path[] tablePaths = FileInputFormat.getInputPaths(
ShimLoader.getHadoopShims().newJobContext(new Job(jobConf)));
final boolean splitByStats = jobConf.getBoolean(
PhoenixStorageHandlerConstants.SPLIT_BY_STATS,
false);

final int parallelThreshold = jobConf.getInt(
PhoenixStorageHandlerConstants.PHOENIX_MINIMUM_PARALLEL_SCANS_THRESHOLD,
PhoenixStorageHandlerConstants.DEFAULT_PHOENIX_MINIMUM_PARALLEL_SCANS_THRESHOLD);
setScanCacheSize(jobConf);
try (org.apache.hadoop.hbase.client.Connection connection = ConnectionFactory
.createConnection(PhoenixConnectionUtil.getConfiguration(jobConf))) {
final RegionLocator regionLocator = connection.getRegionLocator(TableName.valueOf(
qplan.getTableRef().getTable().getPhysicalName().toString()));
final int scanSize = qplan.getScans().size();
if (useParallelInputGeneration(parallelThreshold, scanSize)) {
final int parallelism = jobConf.getInt(
PhoenixStorageHandlerConstants.PHOENIX_INPUTSPLIT_GENERATION_THREAD_COUNT,
PhoenixStorageHandlerConstants
.DEFAULT_PHOENIX_INPUTSPLIT_GENERATION_THREAD_COUNT);
ExecutorService executorService = Executors.newFixedThreadPool(parallelism);
LOG.info("Generating Input Splits in Parallel with {} threads", parallelism);
List<Future<List<InputSplit>>> tasks = new ArrayList<>();

// Adding Localization
try (org.apache.hadoop.hbase.client.Connection connection = ConnectionFactory.createConnection(PhoenixConnectionUtil.getConfiguration(jobConf))) {
RegionLocator regionLocator = connection.getRegionLocator(TableName.valueOf(qplan
.getTableRef().getTable().getPhysicalName().toString()));

for (List<Scan> scans : qplan.getScans()) {
PhoenixInputSplit inputSplit;

HRegionLocation location = regionLocator.getRegionLocation(scans.get(0).getStartRow()
, false);
long regionSize = CompatUtil.getSize(regionLocator, connection.getAdmin(), location);
String regionLocation = PhoenixStorageHandlerUtil.getRegionLocation(location, LOG);

if (splitByStats) {
for (Scan aScan : scans) {
if (LOG.isDebugEnabled()) {
LOG.debug("Split for scan : " + aScan + "with scanAttribute : " + aScan
.getAttributesMap() + " [scanCache, cacheBlock, scanBatch] : [" +
aScan.getCaching() + ", " + aScan.getCacheBlocks() + ", " + aScan
.getBatch() + "] and regionLocation : " + regionLocation);
try {
for (final List<Scan> scans : qplan.getScans()) {
Future<List<InputSplit>> task = executorService.submit(
new Callable<List<InputSplit>>() {
@Override public List<InputSplit> call() throws Exception {
return generateSplitsInternal(query, scans, splitByStats,
connection, regionLocator, tablePaths);
}
});
tasks.add(task);
}

inputSplit = new PhoenixInputSplit(new ArrayList<>(Arrays.asList(aScan)), tablePaths[0],
regionLocation, regionSize);
inputSplit.setQuery(query);
psplits.add(inputSplit);
for (Future<List<InputSplit>> task : tasks) {
psplits.addAll(task.get());
}
} catch (ExecutionException|InterruptedException e) {
Throwable throwable = e.getCause();
if (throwable instanceof IOException) {
throw (IOException) throwable;
} else {
throw new IOException(e);
}
} finally {
executorService.shutdown();
}
} else {
LOG.info("Generating Input Splits in Serial");
for (final List<Scan> scans : qplan.getScans()) {
psplits.addAll(generateSplitsInternal(query, scans,
splitByStats, connection, regionLocator, tablePaths));
}
}
}

return psplits;
}

/**
* This method is used to check whether need to run in parallel to reduce time costs.
* @param parallelThreshold parameter parallelThreshold
* @param scans number of scans
* @return true indicates should generate split in parallel.
*/
private boolean useParallelInputGeneration(final int parallelThreshold, final int scans) {
return parallelThreshold > 0 && scans >= parallelThreshold;
}

/**
* This method is used to generate splits for each scan list.
* @param query phoenix query statement
* @param scans scan list slice of query plan
* @param splitByStats split by stat enabled
* @param connection phoenix connection
* @param regionLocator Hbase Region Locator
* @param tablePaths table paths
* @return List of Input Splits
* @throws IOException if function fails
*/
private List<InputSplit> generateSplitsInternal(final String query, final List<Scan> scans,
final boolean splitByStats, final org.apache.hadoop.hbase.client.Connection connection,
final RegionLocator regionLocator, final Path[] tablePaths) throws IOException {

final List<InputSplit> psplits = new ArrayList<>(scans.size());

PhoenixInputSplit inputSplit;

HRegionLocation location = regionLocator.getRegionLocation(scans.get(0).getStartRow(),
false);
long regionSize = CompatUtil.getSize(regionLocator, connection.getAdmin(), location);
String regionLocation = PhoenixStorageHandlerUtil.getRegionLocation(location, LOG);

if (splitByStats) {
for (Scan aScan : scans) {
if (LOG.isDebugEnabled()) {
LOG.debug("Scan count[" + scans.size() + "] : " + Bytes.toStringBinary(scans
.get(0).getStartRow()) + " ~ " + Bytes.toStringBinary(scans.get(scans
.size() - 1).getStopRow()));
LOG.debug("First scan : " + scans.get(0) + "with scanAttribute : " + scans
.get(0).getAttributesMap() + " [scanCache, cacheBlock, scanBatch] : " +
"[" + scans.get(0).getCaching() + ", " + scans.get(0).getCacheBlocks()
+ ", " + scans.get(0).getBatch() + "] and regionLocation : " +
regionLocation);

for (int i = 0, limit = scans.size(); i < limit; i++) {
LOG.debug("EXPECTED_UPPER_REGION_KEY[" + i + "] : " + Bytes
.toStringBinary(scans.get(i).getAttribute
(BaseScannerRegionObserver.EXPECTED_UPPER_REGION_KEY)));
}
LOG.debug("Split for scan : " + aScan + "with scanAttribute : "
+ aScan.getAttributesMap() + " [scanCache, cacheBlock, scanBatch] : ["
+ aScan.getCaching() + ", " + aScan.getCacheBlocks() + ", "
+ aScan.getBatch() + "] and regionLocation : " + regionLocation);
}

inputSplit = new PhoenixInputSplit(scans, tablePaths[0], regionLocation,
regionSize);
inputSplit = new PhoenixInputSplit(new ArrayList<>(Arrays.asList(aScan)),
tablePaths[0], regionLocation, regionSize);
inputSplit.setQuery(query);
psplits.add(inputSplit);
}
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("Scan count[" + scans.size() + "] : " + Bytes.toStringBinary(scans
.get(0).getStartRow()) + " ~ " + Bytes.toStringBinary(scans.get(scans
.size() - 1).getStopRow()));

LOG.debug("First scan : " + scans.get(0) + "with scanAttribute : " + scans
.get(0).getAttributesMap() + " [scanCache, cacheBlock, scanBatch] : "
+ "[" + scans.get(0).getCaching() + ", " + scans.get(0).getCacheBlocks()
+ ", " + scans.get(0).getBatch() + "] and regionLocation : "
+ regionLocation);

for (int i = 0, limit = scans.size(); i < limit; i++) {
LOG.debug("EXPECTED_UPPER_REGION_KEY[" + i + "] : " + Bytes
.toStringBinary(scans.get(i).getAttribute
(BaseScannerRegionObserver.EXPECTED_UPPER_REGION_KEY)));
}
}

inputSplit = new PhoenixInputSplit(scans, tablePaths[0], regionLocation, regionSize);
inputSplit.setQuery(query);
psplits.add(inputSplit);
}
}

return psplits;
}
Expand Down
Loading