From eeb308af99d21ff39ee311fe00ae942ab3cf067c Mon Sep 17 00:00:00 2001 From: jichen Date: Mon, 9 May 2022 22:25:16 +0800 Subject: [PATCH 01/13] PHOENIX-6698 hive-connector will take long time to generate splits for large phoenix tables. --- .../hive/mapreduce/PhoenixInputFormat.java | 128 +++++++++----- .../hive/HivePhoenixInputFormatTest.java | 167 ++++++++++++++++++ 2 files changed, 250 insertions(+), 45 deletions(-) create mode 100644 phoenix-hive-base/src/test/java/org/apache/phoenix/hive/HivePhoenixInputFormatTest.java diff --git a/phoenix-hive-base/src/main/java/org/apache/phoenix/hive/mapreduce/PhoenixInputFormat.java b/phoenix-hive-base/src/main/java/org/apache/phoenix/hive/mapreduce/PhoenixInputFormat.java index 091f9ea8..0c81d4b5 100644 --- a/phoenix-hive-base/src/main/java/org/apache/phoenix/hive/mapreduce/PhoenixInputFormat.java +++ b/phoenix-hive-base/src/main/java/org/apache/phoenix/hive/mapreduce/PhoenixInputFormat.java @@ -24,6 +24,8 @@ import java.util.Arrays; import java.util.List; import java.util.Properties; +import java.util.concurrent.*; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HConstants; @@ -128,66 +130,102 @@ private List generateSplits(final JobConf jobConf, final QueryPlan q } final List psplits = new ArrayList<>(splits.size()); - Path[] tablePaths = FileInputFormat.getInputPaths(ShimLoader.getHadoopShims() + final Path[] tablePaths = FileInputFormat.getInputPaths(ShimLoader.getHadoopShims() .newJobContext(new Job(jobConf))); - boolean splitByStats = jobConf.getBoolean(PhoenixStorageHandlerConstants.SPLIT_BY_STATS, + final boolean splitByStats = jobConf.getBoolean(PhoenixStorageHandlerConstants.SPLIT_BY_STATS, false); - + final int parallelThreshould = jobConf.getInt("hive.phoenix.split.parallel.threshold", 32); setScanCacheSize(jobConf); + if ( (parallelThreshould) <=0 || (qplan.getScans().size() < parallelThreshould) ){ + LOG.info("generate splits in serial"); + for (final List scans : qplan.getScans()) { + psplits.addAll(generateSplitsInternal(jobConf,qplan, splits, query,scans,splitByStats,tablePaths)); + } + } else { + final int parallism = jobConf.getInt("hive.phoenix.split.parallel.level",Runtime.getRuntime().availableProcessors()*2); + ExecutorService executorService = Executors.newFixedThreadPool(parallism); + LOG.info("generate splits in parallel with {} threads",parallism); + + List>> tasks = new ArrayList<>(); + + try{ + for (final List scans : qplan.getScans()) { + Future> task= executorService.submit(new Callable>() { + @Override + public List call() throws Exception { + return generateSplitsInternal(jobConf,qplan, splits, query,scans,splitByStats,tablePaths); + } + }); + tasks.add(task); + } + for (Future> task : tasks) { + psplits.addAll(task.get()); + } + }catch (ExecutionException | InterruptedException exception){ + throw new RuntimeException("failed to get splits,reason:",exception); + }finally { + executorService.shutdown(); + } + } + return psplits; + } + + private List generateSplitsInternal(final JobConf jobConf, final QueryPlan qplan, + final List splits, final String query,final List scans,final boolean splitByStats,final Path[] tablePaths) throws + IOException { - // Adding Localization - try (org.apache.hadoop.hbase.client.Connection connection = ConnectionFactory.createConnection(PhoenixConnectionUtil.getConfiguration(jobConf))) { - RegionLocator regionLocator = connection.getRegionLocator(TableName.valueOf(qplan - .getTableRef().getTable().getPhysicalName().toString())); + final List psplits = new ArrayList<>(scans.size()); + try (org.apache.hadoop.hbase.client.Connection connection = ConnectionFactory.createConnection( + PhoenixConnectionUtil.getConfiguration(jobConf))) { + RegionLocator + regionLocator = + connection.getRegionLocator(TableName.valueOf( + qplan.getTableRef().getTable().getPhysicalName().toString())); - for (List scans : qplan.getScans()) { - PhoenixInputSplit inputSplit; + { + PhoenixInputSplit inputSplit; - HRegionLocation location = regionLocator.getRegionLocation(scans.get(0).getStartRow() - , false); - long regionSize = CompatUtil.getSize(regionLocator, connection.getAdmin(), location); - String regionLocation = PhoenixStorageHandlerUtil.getRegionLocation(location, LOG); + HRegionLocation location = regionLocator.getRegionLocation(scans.get(0).getStartRow(), false); + long regionSize = CompatUtil.getSize(regionLocator, connection.getAdmin(), location); + String regionLocation = PhoenixStorageHandlerUtil.getRegionLocation(location, LOG); - if (splitByStats) { - for (Scan aScan : scans) { + if (splitByStats) { + for (Scan aScan : scans) { + if (LOG.isDebugEnabled()) { + LOG.debug("Split for scan : " + aScan + "with scanAttribute : " + aScan.getAttributesMap() + + " [scanCache, cacheBlock, scanBatch] : [" + aScan.getCaching() + + ", " + aScan.getCacheBlocks() + ", " + aScan.getBatch() + "] and regionLocation : " + regionLocation); + } + + inputSplit = + new PhoenixInputSplit(new ArrayList<>(Arrays.asList(aScan)), + tablePaths[0], regionLocation, regionSize); + inputSplit.setQuery(query); + psplits.add(inputSplit); + } + } else { if (LOG.isDebugEnabled()) { - LOG.debug("Split for scan : " + aScan + "with scanAttribute : " + aScan - .getAttributesMap() + " [scanCache, cacheBlock, scanBatch] : [" + - aScan.getCaching() + ", " + aScan.getCacheBlocks() + ", " + aScan - .getBatch() + "] and regionLocation : " + regionLocation); + LOG.debug("Scan count[" + scans.size() + "] : " + Bytes.toStringBinary( + scans.get(0).getStartRow()) + " ~ " + Bytes.toStringBinary( + scans.get(scans.size() - 1).getStopRow())); + LOG.debug("First scan : " + scans.get(0) + "with scanAttribute : " + scans.get(0).getAttributesMap() + + " [scanCache, cacheBlock, scanBatch] : " + "[" + scans.get(0).getCaching() + ", " + scans.get(0).getCacheBlocks() + + ", " + scans.get(0).getBatch() + "] and regionLocation : " + + regionLocation); + + for (int i = 0, limit = scans.size(); i < limit; i++) { + LOG.debug("EXPECTED_UPPER_REGION_KEY[" + i + "] : " + Bytes.toStringBinary(scans.get(i).getAttribute( + BaseScannerRegionObserver.EXPECTED_UPPER_REGION_KEY))); + } } - inputSplit = new PhoenixInputSplit(new ArrayList<>(Arrays.asList(aScan)), tablePaths[0], - regionLocation, regionSize); + inputSplit = + new PhoenixInputSplit(scans, tablePaths[0], regionLocation, regionSize); inputSplit.setQuery(query); psplits.add(inputSplit); } - } else { - if (LOG.isDebugEnabled()) { - LOG.debug("Scan count[" + scans.size() + "] : " + Bytes.toStringBinary(scans - .get(0).getStartRow()) + " ~ " + Bytes.toStringBinary(scans.get(scans - .size() - 1).getStopRow())); - LOG.debug("First scan : " + scans.get(0) + "with scanAttribute : " + scans - .get(0).getAttributesMap() + " [scanCache, cacheBlock, scanBatch] : " + - "[" + scans.get(0).getCaching() + ", " + scans.get(0).getCacheBlocks() - + ", " + scans.get(0).getBatch() + "] and regionLocation : " + - regionLocation); - - for (int i = 0, limit = scans.size(); i < limit; i++) { - LOG.debug("EXPECTED_UPPER_REGION_KEY[" + i + "] : " + Bytes - .toStringBinary(scans.get(i).getAttribute - (BaseScannerRegionObserver.EXPECTED_UPPER_REGION_KEY))); - } - } - - inputSplit = new PhoenixInputSplit(scans, tablePaths[0], regionLocation, - regionSize); - inputSplit.setQuery(query); - psplits.add(inputSplit); } } - } - return psplits; } diff --git a/phoenix-hive-base/src/test/java/org/apache/phoenix/hive/HivePhoenixInputFormatTest.java b/phoenix-hive-base/src/test/java/org/apache/phoenix/hive/HivePhoenixInputFormatTest.java new file mode 100644 index 00000000..f7370326 --- /dev/null +++ b/phoenix-hive-base/src/test/java/org/apache/phoenix/hive/HivePhoenixInputFormatTest.java @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.hive; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.JobConf; +import org.apache.phoenix.end2end.ParallelStatsDisabledIT; +import org.apache.phoenix.end2end.ParallelStatsDisabledTest; +import org.apache.phoenix.hive.constants.PhoenixStorageHandlerConstants; +import org.apache.phoenix.hive.mapreduce.PhoenixInputFormat; +import org.apache.phoenix.mapreduce.PhoenixRecordWritable; +import org.apache.phoenix.schema.TableAlreadyExistsException; +import org.junit.Assert; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.concurrent.NotThreadSafe; +import java.io.IOException; +import java.sql.*; +import java.util.Locale; +import java.util.Properties; + +/** + * Test class for Hive PhoenixInputFormat + */ +@NotThreadSafe +@Category(ParallelStatsDisabledTest.class) +public class HivePhoenixInputFormatTest extends ParallelStatsDisabledIT { + private static final Logger LOG = LoggerFactory.getLogger(HivePhoenixInputFormatTest.class); + private static final String TABLE_NAME = "HivePhoenixInputFormatTest".toUpperCase(Locale.ROOT); + private static final String DDL = "CREATE TABLE " + TABLE_NAME + " (V1 varchar NOT NULL PRIMARY KEY, V2 integer)"; + private static final int SPLITS = 128; + + @Test + public void testGetSplitsSerialOrParallel() throws IOException,SQLException { + PhoenixInputFormat inputFormat = new PhoenixInputFormat(); + long start,end; + + // create table with N splits + System.out.println(String.format("generate testing table with %s splits",String.valueOf(SPLITS))); + setupTestTable(); + // setup configuration required for PhoenixInputFormat + Configuration conf = getUtility().getConfiguration(); + JobConf jobConf = new JobConf(conf); + configureTestInput(jobConf); + + + // test get splits in serial + start = System.currentTimeMillis(); + jobConf.set("hive.phoenix.split.parallel.threshold","0"); + InputSplit[] inputSplitsSerial = inputFormat.getSplits(jobConf,SPLITS); + end = System.currentTimeMillis(); + long durationInSerial=end - start; + System.out.println(String.format("get split in serial requires:%s ms",String.valueOf(durationInSerial))); + + // test get splits in parallel + start = System.currentTimeMillis(); + jobConf.set("hive.phoenix.split.parallel.threshold","1"); + InputSplit[] inputSplitsParallel = inputFormat.getSplits(jobConf,SPLITS); + end = System.currentTimeMillis(); + long durationInParallel=end - start; + + System.out.println(String.format("get split in parallel requires:%s ms",String.valueOf(durationInParallel))); + + // Test if performance of parallel method is better than serial method + Assert.assertTrue(durationInParallel < durationInSerial); + // Test if the input split returned by serial method and parallel method are the same + Assert.assertTrue(inputSplitsParallel.length==SPLITS); + Assert.assertTrue(inputSplitsParallel.length == inputSplitsSerial.length); + for (final InputSplit inputSplitParallel:inputSplitsParallel){ + boolean match=false; + for (final InputSplit inputSplitSerial:inputSplitsSerial){ + if (inputSplitParallel.equals(inputSplitSerial)){ + match=true; + break; + } + } + Assert.assertTrue(match); + } + } + + private static void setupTestTable() throws SQLException { + final byte [] start=new byte[0]; + final byte [] end = Bytes.createMaxByteArray(4); + final byte[][] splits = Bytes.split(start, end, SPLITS-2); + createTestTableWithBinarySplit(getUrl(),DDL, splits ,null); + } + + private static void buildPreparedSqlWithBinarySplits(StringBuffer sb,int splits) + { + int splitPoints = splits -1; + sb.append(" SPLIT ON("); + sb.append("?"); + for (int i = 1; i < splitPoints; i++) { + sb.append(",?"); + } + sb.append(")"); + } + + private static PreparedStatement createPreparedStatement(Connection connection, String newSql,byte [][] splitsBytes) throws SQLException { + final PreparedStatement statement = (PreparedStatement) connection.prepareStatement(newSql); + final int splitPoints = splitsBytes.length-1; + for (int i = 1; i <= splitPoints; i++) { + statement.setBytes(i, splitsBytes[i]); + } + return statement; + } + + protected static void createTestTableWithBinarySplit(String url, String ddl, byte[][] splits, Long ts) throws SQLException { + Assert.assertNotNull(ddl); + StringBuffer buf = new StringBuffer(ddl); + buildPreparedSqlWithBinarySplits(buf, splits.length); + + ddl = buf.toString(); + Properties props = new Properties(); + if (ts != null) { + props.setProperty("CurrentSCN", Long.toString(ts)); + } + + Connection conn = DriverManager.getConnection(url, props); + + try { + try(Statement stmt = conn.createStatement()) { + stmt.execute("DROP TABLE IF EXISTS " + TABLE_NAME); + } + try(PreparedStatement statement = createPreparedStatement(conn,ddl.toString(),splits)){ + statement.execute(); + } + try(Statement stmt = conn.createStatement()) { + stmt.execute("UPSERT INTO " + TABLE_NAME +" VALUES('1',1)"); + } + } catch (TableAlreadyExistsException var12) { + throw var12; + } finally { + conn.close(); + } + + } + + protected static void configureTestInput(JobConf jobConf){ + jobConf.set(PhoenixStorageHandlerConstants.PHOENIX_TABLE_NAME,TABLE_NAME); + jobConf.set("ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR",""); + jobConf.set("PhoenixStorageHandlerConstants.PHOENIX_COLUMN_MAPPING","v1:V1,v2:V2"); + jobConf.set("phoenix.zookeeper.quorum","localhost"); + jobConf.set("phoenix.zookeeper.client.port",String.valueOf(getZKClientPort(jobConf))); + jobConf.set("mapreduce.input.fileinputformat.inputdir","/tmp"); + } +} From 512580e3dc2fb3405b4ff2ef40395789e67d3f46 Mon Sep 17 00:00:00 2001 From: jichen Date: Mon, 9 May 2022 22:25:16 +0800 Subject: [PATCH 02/13] PHOENIX-6698 hive-connector will take long time to generate splits for large phoenix tables. --- .../hive/mapreduce/PhoenixInputFormat.java | 198 ++++++++++++------ .../hive/HivePhoenixInputFormatTest.java | 85 ++++++-- 2 files changed, 202 insertions(+), 81 deletions(-) diff --git a/phoenix-hive-base/src/main/java/org/apache/phoenix/hive/mapreduce/PhoenixInputFormat.java b/phoenix-hive-base/src/main/java/org/apache/phoenix/hive/mapreduce/PhoenixInputFormat.java index 0c81d4b5..c1c8e153 100644 --- a/phoenix-hive-base/src/main/java/org/apache/phoenix/hive/mapreduce/PhoenixInputFormat.java +++ b/phoenix-hive-base/src/main/java/org/apache/phoenix/hive/mapreduce/PhoenixInputFormat.java @@ -22,10 +22,13 @@ import java.sql.Statement; import java.util.ArrayList; import java.util.Arrays; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; import java.util.List; import java.util.Properties; -import java.util.concurrent.*; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HConstants; @@ -123,37 +126,60 @@ public InputSplit[] getSplits(JobConf jobConf, int numSplits) throws IOException private List generateSplits(final JobConf jobConf, final QueryPlan qplan, final List splits, String query) throws IOException { - if (qplan == null){ + if (qplan == null) { throw new NullPointerException(); - }if (splits == null){ + } + if (splits == null) { throw new NullPointerException(); } final List psplits = new ArrayList<>(splits.size()); - final Path[] tablePaths = FileInputFormat.getInputPaths(ShimLoader.getHadoopShims() - .newJobContext(new Job(jobConf))); - final boolean splitByStats = jobConf.getBoolean(PhoenixStorageHandlerConstants.SPLIT_BY_STATS, + final Path[] tablePaths = FileInputFormat.getInputPaths( + ShimLoader.getHadoopShims().newJobContext(new Job(jobConf))); + final boolean splitByStats = jobConf.getBoolean( + PhoenixStorageHandlerConstants.SPLIT_BY_STATS, false); - final int parallelThreshould = jobConf.getInt("hive.phoenix.split.parallel.threshold", 32); + final int parallelThreshould = jobConf.getInt( + "hive.phoenix.split.parallel.threshold", + 32); setScanCacheSize(jobConf); - if ( (parallelThreshould) <=0 || (qplan.getScans().size() < parallelThreshould) ){ + if (needGenSplitSerially(parallelThreshould, qplan)) { LOG.info("generate splits in serial"); for (final List scans : qplan.getScans()) { - psplits.addAll(generateSplitsInternal(jobConf,qplan, splits, query,scans,splitByStats,tablePaths)); + psplits.addAll( + generateSplitsInternal( + jobConf, + qplan, + splits, + query, + scans, + splitByStats, + tablePaths) + ); } } else { - final int parallism = jobConf.getInt("hive.phoenix.split.parallel.level",Runtime.getRuntime().availableProcessors()*2); - ExecutorService executorService = Executors.newFixedThreadPool(parallism); - LOG.info("generate splits in parallel with {} threads",parallism); + final int parallism = jobConf.getInt( + "hive.phoenix.split.parallel.level", + Runtime.getRuntime().availableProcessors() * 2); + ExecutorService executorService = Executors.newFixedThreadPool( + parallism); + LOG.info("generate splits in parallel with {} threads", parallism); List>> tasks = new ArrayList<>(); - try{ + try { for (final List scans : qplan.getScans()) { - Future> task= executorService.submit(new Callable>() { + Future> task = executorService.submit( + new Callable>() { @Override public List call() throws Exception { - return generateSplitsInternal(jobConf,qplan, splits, query,scans,splitByStats,tablePaths); + return generateSplitsInternal(jobConf, + qplan, + splits, + query, + scans, + splitByStats, + tablePaths); } }); tasks.add(task); @@ -161,69 +187,123 @@ public List call() throws Exception { for (Future> task : tasks) { psplits.addAll(task.get()); } - }catch (ExecutionException | InterruptedException exception){ - throw new RuntimeException("failed to get splits,reason:",exception); - }finally { + } catch (ExecutionException | InterruptedException exception) { + throw new IOException("failed to get splits,reason:", + exception); + } finally { executorService.shutdown(); } } return psplits; } - private List generateSplitsInternal(final JobConf jobConf, final QueryPlan qplan, - final List splits, final String query,final List scans,final boolean splitByStats,final Path[] tablePaths) throws - IOException { + private boolean needGenSplitSerially(int threshold, QueryPlan qplan) { + if (threshold <= 0) { + return true; + } + if (qplan.getScans().size() < threshold) { + return true; + } + return false; + } + private List generateSplitsInternal(final JobConf jobConf, + final QueryPlan qplan, + final List splits, + final String query, + final List scans, + final boolean splitByStats, + final Path[] tablePaths) throws IOException { final List psplits = new ArrayList<>(scans.size()); - try (org.apache.hadoop.hbase.client.Connection connection = ConnectionFactory.createConnection( + try (org.apache.hadoop.hbase.client.Connection connection = + ConnectionFactory.createConnection( PhoenixConnectionUtil.getConfiguration(jobConf))) { - RegionLocator - regionLocator = + RegionLocator regionLocator = connection.getRegionLocator(TableName.valueOf( - qplan.getTableRef().getTable().getPhysicalName().toString())); - - { - PhoenixInputSplit inputSplit; - - HRegionLocation location = regionLocator.getRegionLocation(scans.get(0).getStartRow(), false); - long regionSize = CompatUtil.getSize(regionLocator, connection.getAdmin(), location); - String regionLocation = PhoenixStorageHandlerUtil.getRegionLocation(location, LOG); + qplan.getTableRef().getTable() + .getPhysicalName().toString())); + PhoenixInputSplit inputSplit; - if (splitByStats) { - for (Scan aScan : scans) { - if (LOG.isDebugEnabled()) { - LOG.debug("Split for scan : " + aScan + "with scanAttribute : " + aScan.getAttributesMap() - + " [scanCache, cacheBlock, scanBatch] : [" + aScan.getCaching() - + ", " + aScan.getCacheBlocks() + ", " + aScan.getBatch() + "] and regionLocation : " + regionLocation); - } + HRegionLocation location = regionLocator.getRegionLocation( + scans.get(0).getStartRow(), + false); + long regionSize = CompatUtil.getSize(regionLocator, + connection.getAdmin(), + location); + String regionLocation = + PhoenixStorageHandlerUtil.getRegionLocation(location, + LOG); - inputSplit = - new PhoenixInputSplit(new ArrayList<>(Arrays.asList(aScan)), - tablePaths[0], regionLocation, regionSize); - inputSplit.setQuery(query); - psplits.add(inputSplit); - } - } else { + if (splitByStats) { + for (Scan aScan : scans) { if (LOG.isDebugEnabled()) { - LOG.debug("Scan count[" + scans.size() + "] : " + Bytes.toStringBinary( - scans.get(0).getStartRow()) + " ~ " + Bytes.toStringBinary( - scans.get(scans.size() - 1).getStopRow())); - LOG.debug("First scan : " + scans.get(0) + "with scanAttribute : " + scans.get(0).getAttributesMap() - + " [scanCache, cacheBlock, scanBatch] : " + "[" + scans.get(0).getCaching() + ", " + scans.get(0).getCacheBlocks() - + ", " + scans.get(0).getBatch() + "] and regionLocation : " + LOG.debug("Split for scan : " + + aScan + + "with scanAttribute : " + + aScan.getAttributesMap() + + " [scanCache, cacheBlock, scanBatch] : [" + + aScan.getCaching() + + ", " + + aScan.getCacheBlocks() + + ", " + + aScan.getBatch() + + "] and regionLocation : " + regionLocation); - - for (int i = 0, limit = scans.size(); i < limit; i++) { - LOG.debug("EXPECTED_UPPER_REGION_KEY[" + i + "] : " + Bytes.toStringBinary(scans.get(i).getAttribute( - BaseScannerRegionObserver.EXPECTED_UPPER_REGION_KEY))); - } } inputSplit = - new PhoenixInputSplit(scans, tablePaths[0], regionLocation, regionSize); + new PhoenixInputSplit( + new ArrayList<>(Arrays.asList(aScan)), + tablePaths[0], + regionLocation, + regionSize); inputSplit.setQuery(query); psplits.add(inputSplit); } + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("Scan count[" + + scans.size() + + "] : " + + Bytes.toStringBinary( + scans.get(0).getStartRow()) + + " ~ " + + Bytes.toStringBinary( + scans.get(scans.size() - 1).getStopRow())); + LOG.debug("First scan : " + + scans.get(0) + + "with scanAttribute : " + + scans.get(0).getAttributesMap() + + " [scanCache, cacheBlock, scanBatch] : " + + "[" + scans.get(0).getCaching() + + ", " + + scans.get(0).getCacheBlocks() + + ", " + + scans.get(0).getBatch() + + "] and regionLocation : " + + regionLocation); + + for (int i = 0, limit = scans.size(); i < limit; i++) { + LOG.debug( + "EXPECTED_UPPER_REGION_KEY[" + + i + + "] : " + + Bytes.toStringBinary( + scans.get(i).getAttribute( + BaseScannerRegionObserver + .EXPECTED_UPPER_REGION_KEY + ) + )); + } + } + + inputSplit = + new PhoenixInputSplit(scans, + tablePaths[0], + regionLocation, + regionSize); + inputSplit.setQuery(query); + psplits.add(inputSplit); } } return psplits; diff --git a/phoenix-hive-base/src/test/java/org/apache/phoenix/hive/HivePhoenixInputFormatTest.java b/phoenix-hive-base/src/test/java/org/apache/phoenix/hive/HivePhoenixInputFormatTest.java index f7370326..85966b20 100644 --- a/phoenix-hive-base/src/test/java/org/apache/phoenix/hive/HivePhoenixInputFormatTest.java +++ b/phoenix-hive-base/src/test/java/org/apache/phoenix/hive/HivePhoenixInputFormatTest.java @@ -35,7 +35,11 @@ import javax.annotation.concurrent.NotThreadSafe; import java.io.IOException; -import java.sql.*; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.sql.Statement; import java.util.Locale; import java.util.Properties; @@ -45,18 +49,31 @@ @NotThreadSafe @Category(ParallelStatsDisabledTest.class) public class HivePhoenixInputFormatTest extends ParallelStatsDisabledIT { - private static final Logger LOG = LoggerFactory.getLogger(HivePhoenixInputFormatTest.class); - private static final String TABLE_NAME = "HivePhoenixInputFormatTest".toUpperCase(Locale.ROOT); - private static final String DDL = "CREATE TABLE " + TABLE_NAME + " (V1 varchar NOT NULL PRIMARY KEY, V2 integer)"; + private static final Logger LOG = LoggerFactory.getLogger( + HivePhoenixInputFormatTest.class); + private static final String TABLE_NAME = "HivePhoenixInputFormatTest" + .toUpperCase(Locale.ROOT); + private static final String DDL = "CREATE TABLE " + + TABLE_NAME + + " (V1 varchar NOT NULL PRIMARY KEY, V2 integer)"; private static final int SPLITS = 128; + /* + * + * This test will create phoenix table with 128 splits and compare + * performance of split generation in serial/parallel + * + * */ @Test public void testGetSplitsSerialOrParallel() throws IOException,SQLException { - PhoenixInputFormat inputFormat = new PhoenixInputFormat(); + PhoenixInputFormat inputFormat = + new PhoenixInputFormat(); long start,end; // create table with N splits - System.out.println(String.format("generate testing table with %s splits",String.valueOf(SPLITS))); + System.out.println( + String.format("generate testing table with %s splits", + String.valueOf(SPLITS))); setupTestTable(); // setup configuration required for PhoenixInputFormat Configuration conf = getUtility().getConfiguration(); @@ -70,22 +87,30 @@ public void testGetSplitsSerialOrParallel() throws IOException,SQLException { InputSplit[] inputSplitsSerial = inputFormat.getSplits(jobConf,SPLITS); end = System.currentTimeMillis(); long durationInSerial=end - start; - System.out.println(String.format("get split in serial requires:%s ms",String.valueOf(durationInSerial))); + System.out.println(String.format( + "get split in serial requires:%s ms", + String.valueOf(durationInSerial))); // test get splits in parallel start = System.currentTimeMillis(); - jobConf.set("hive.phoenix.split.parallel.threshold","1"); - InputSplit[] inputSplitsParallel = inputFormat.getSplits(jobConf,SPLITS); + jobConf.set("hive.phoenix.split.parallel.threshold", "1"); + InputSplit[] inputSplitsParallel = inputFormat.getSplits( + jobConf, + SPLITS); end = System.currentTimeMillis(); long durationInParallel=end - start; - System.out.println(String.format("get split in parallel requires:%s ms",String.valueOf(durationInParallel))); + System.out.println(String.format( + "get split in parallel requires:%s ms", + String.valueOf(durationInParallel))); // Test if performance of parallel method is better than serial method Assert.assertTrue(durationInParallel < durationInSerial); // Test if the input split returned by serial method and parallel method are the same Assert.assertTrue(inputSplitsParallel.length==SPLITS); - Assert.assertTrue(inputSplitsParallel.length == inputSplitsSerial.length); + Assert.assertTrue( + inputSplitsParallel.length == inputSplitsSerial.length + ); for (final InputSplit inputSplitParallel:inputSplitsParallel){ boolean match=false; for (final InputSplit inputSplitSerial:inputSplitsSerial){ @@ -105,7 +130,9 @@ private static void setupTestTable() throws SQLException { createTestTableWithBinarySplit(getUrl(),DDL, splits ,null); } - private static void buildPreparedSqlWithBinarySplits(StringBuffer sb,int splits) + private static void buildPreparedSqlWithBinarySplits( + StringBuffer sb, + int splits) { int splitPoints = splits -1; sb.append(" SPLIT ON("); @@ -116,8 +143,11 @@ private static void buildPreparedSqlWithBinarySplits(StringBuffer sb,int splits) sb.append(")"); } - private static PreparedStatement createPreparedStatement(Connection connection, String newSql,byte [][] splitsBytes) throws SQLException { - final PreparedStatement statement = (PreparedStatement) connection.prepareStatement(newSql); + private static PreparedStatement createPreparedStatement(Connection connection, + String newSql, + byte [][] splitsBytes) throws SQLException { + final PreparedStatement statement = (PreparedStatement) connection + .prepareStatement(newSql); final int splitPoints = splitsBytes.length-1; for (int i = 1; i <= splitPoints; i++) { statement.setBytes(i, splitsBytes[i]); @@ -125,7 +155,11 @@ private static PreparedStatement createPreparedStatement(Connection connection, return statement; } - protected static void createTestTableWithBinarySplit(String url, String ddl, byte[][] splits, Long ts) throws SQLException { + protected static void createTestTableWithBinarySplit( + String url, + String ddl, + byte[][] splits, + Long ts) throws SQLException { Assert.assertNotNull(ddl); StringBuffer buf = new StringBuffer(ddl); buildPreparedSqlWithBinarySplits(buf, splits.length); @@ -142,11 +176,13 @@ protected static void createTestTableWithBinarySplit(String url, String ddl, byt try(Statement stmt = conn.createStatement()) { stmt.execute("DROP TABLE IF EXISTS " + TABLE_NAME); } - try(PreparedStatement statement = createPreparedStatement(conn,ddl.toString(),splits)){ + try(PreparedStatement statement = createPreparedStatement(conn, + ddl.toString(), + splits)){ statement.execute(); } try(Statement stmt = conn.createStatement()) { - stmt.execute("UPSERT INTO " + TABLE_NAME +" VALUES('1',1)"); + stmt.execute("UPSERT INTO " + TABLE_NAME +" VALUES('1' ,1)"); } } catch (TableAlreadyExistsException var12) { throw var12; @@ -157,11 +193,16 @@ protected static void createTestTableWithBinarySplit(String url, String ddl, byt } protected static void configureTestInput(JobConf jobConf){ - jobConf.set(PhoenixStorageHandlerConstants.PHOENIX_TABLE_NAME,TABLE_NAME); - jobConf.set("ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR",""); - jobConf.set("PhoenixStorageHandlerConstants.PHOENIX_COLUMN_MAPPING","v1:V1,v2:V2"); + jobConf.set( + PhoenixStorageHandlerConstants.PHOENIX_TABLE_NAME, + TABLE_NAME); + jobConf.set("ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR", ""); + jobConf.set("PhoenixStorageHandlerConstants.PHOENIX_COLUMN_MAPPING", + "v1:V1,v2:V2"); jobConf.set("phoenix.zookeeper.quorum","localhost"); - jobConf.set("phoenix.zookeeper.client.port",String.valueOf(getZKClientPort(jobConf))); - jobConf.set("mapreduce.input.fileinputformat.inputdir","/tmp"); + jobConf.set("phoenix.zookeeper.client.port", + String.valueOf(getZKClientPort(jobConf))); + jobConf.set("mapreduce.input.fileinputformat.inputdir", + "/tmp"); } } From 81a157acbb017deeb9b3a6d6aab457f676578797 Mon Sep 17 00:00:00 2001 From: jichen Date: Mon, 9 May 2022 22:25:16 +0800 Subject: [PATCH 03/13] PHOENIX-6698 hive-connector will take long time to generate splits for large phoenix tables. --- .../hive/mapreduce/PhoenixInputFormat.java | 28 +++++++++++-------- 1 file changed, 17 insertions(+), 11 deletions(-) diff --git a/phoenix-hive-base/src/main/java/org/apache/phoenix/hive/mapreduce/PhoenixInputFormat.java b/phoenix-hive-base/src/main/java/org/apache/phoenix/hive/mapreduce/PhoenixInputFormat.java index c1c8e153..0cfac55e 100644 --- a/phoenix-hive-base/src/main/java/org/apache/phoenix/hive/mapreduce/PhoenixInputFormat.java +++ b/phoenix-hive-base/src/main/java/org/apache/phoenix/hive/mapreduce/PhoenixInputFormat.java @@ -143,7 +143,11 @@ private List generateSplits(final JobConf jobConf, final QueryPlan q "hive.phoenix.split.parallel.threshold", 32); setScanCacheSize(jobConf); - if (needGenSplitSerially(parallelThreshould, qplan)) { + if ( + (parallelThreshould <= 0) + || + (qplan.getScans().size() < parallelThreshould) + ) { LOG.info("generate splits in serial"); for (final List scans : qplan.getScans()) { psplits.addAll( @@ -196,16 +200,18 @@ public List call() throws Exception { } return psplits; } - - private boolean needGenSplitSerially(int threshold, QueryPlan qplan) { - if (threshold <= 0) { - return true; - } - if (qplan.getScans().size() < threshold) { - return true; - } - return false; - } + /** + * This method is used to generate splits for each scan list. + * @param jobConf MapReduce Job Configuration + * @param qplan phoenix query plan + * @param splits phoenix table splits + * @param query phoenix query statement + * @param scans scan list slice of query plan + * @param splitByStats split by stat enabled + * @param tablePaths table paths + * @return List of Input Splits + * @throws IOException if function fails + */ private List generateSplitsInternal(final JobConf jobConf, final QueryPlan qplan, final List splits, From b0a9458923ecef5280a6c94c543b0ed49454731e Mon Sep 17 00:00:00 2001 From: jichen Date: Mon, 9 May 2022 22:25:16 +0800 Subject: [PATCH 04/13] PHOENIX-6698 hive-connector will take long time to generate splits for large phoenix tables. --- .../hive/mapreduce/PhoenixInputFormat.java | 251 +++++++----------- .../hive/HivePhoenixInputFormatTest.java | 177 ++++++------ 2 files changed, 187 insertions(+), 241 deletions(-) diff --git a/phoenix-hive-base/src/main/java/org/apache/phoenix/hive/mapreduce/PhoenixInputFormat.java b/phoenix-hive-base/src/main/java/org/apache/phoenix/hive/mapreduce/PhoenixInputFormat.java index 0cfac55e..fe66d328 100644 --- a/phoenix-hive-base/src/main/java/org/apache/phoenix/hive/mapreduce/PhoenixInputFormat.java +++ b/phoenix-hive-base/src/main/java/org/apache/phoenix/hive/mapreduce/PhoenixInputFormat.java @@ -22,13 +22,14 @@ import java.sql.Statement; import java.util.ArrayList; import java.util.Arrays; +import java.util.List; +import java.util.Properties; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; -import java.util.concurrent.Executors; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.Future; -import java.util.List; -import java.util.Properties; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HConstants; @@ -53,6 +54,7 @@ import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.db.DBWritable; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.phoenix.compat.CompatUtil; import org.apache.phoenix.compile.QueryPlan; import org.apache.phoenix.coprocessor.BaseScannerRegionObserver; import org.apache.phoenix.hive.constants.PhoenixStorageHandlerConstants; @@ -66,7 +68,6 @@ import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil; import org.apache.phoenix.query.KeyRange; import org.apache.phoenix.util.PhoenixRuntime; -import org.apache.phoenix.compat.CompatUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -78,7 +79,6 @@ public class PhoenixInputFormat implements InputFormat { private static final Logger LOG = LoggerFactory.getLogger(PhoenixInputFormat.class); - public PhoenixInputFormat() { if (LOG.isDebugEnabled()) { LOG.debug("PhoenixInputFormat created"); @@ -124,8 +124,9 @@ public InputSplit[] getSplits(JobConf jobConf, int numSplits) throws IOException } private List generateSplits(final JobConf jobConf, final QueryPlan qplan, - final List splits, String query) throws - IOException { + final List splits, final String query) + throws IOException { + if (qplan == null) { throw new NullPointerException(); } @@ -139,67 +140,65 @@ private List generateSplits(final JobConf jobConf, final QueryPlan q final boolean splitByStats = jobConf.getBoolean( PhoenixStorageHandlerConstants.SPLIT_BY_STATS, false); - final int parallelThreshould = jobConf.getInt( - "hive.phoenix.split.parallel.threshold", - 32); + final int parallelThreshold = jobConf.getInt("hive.phoenix.split.parallel.threshold", + 8); setScanCacheSize(jobConf); - if ( - (parallelThreshould <= 0) - || - (qplan.getScans().size() < parallelThreshould) - ) { - LOG.info("generate splits in serial"); - for (final List scans : qplan.getScans()) { - psplits.addAll( - generateSplitsInternal( - jobConf, - qplan, - splits, - query, - scans, - splitByStats, - tablePaths) - ); - } - } else { - final int parallism = jobConf.getInt( - "hive.phoenix.split.parallel.level", - Runtime.getRuntime().availableProcessors() * 2); - ExecutorService executorService = Executors.newFixedThreadPool( - parallism); - LOG.info("generate splits in parallel with {} threads", parallism); + try (org.apache.hadoop.hbase.client.Connection connection = ConnectionFactory + .createConnection(PhoenixConnectionUtil.getConfiguration(jobConf))) { + final RegionLocator regionLocator = connection.getRegionLocator(TableName.valueOf( + qplan.getTableRef().getTable().getPhysicalName().toString())); + final int scanSize = qplan.getScans().size(); + if (needRunInParallel(parallelThreshold, scanSize)) { + final int parallism = jobConf.getInt("hive.phoenix.split.parallel.level", + Runtime.getRuntime().availableProcessors() * 2); + ExecutorService executorService = Executors.newFixedThreadPool(parallism); + LOG.info("generate splits in parallel with {} threads", parallism); - List>> tasks = new ArrayList<>(); + List>> tasks = new ArrayList<>(); - try { - for (final List scans : qplan.getScans()) { - Future> task = executorService.submit( - new Callable>() { - @Override - public List call() throws Exception { - return generateSplitsInternal(jobConf, - qplan, - splits, - query, - scans, - splitByStats, - tablePaths); - } - }); - tasks.add(task); + try { + for (final List scans : qplan.getScans()) { + Future> task = executorService.submit( + new Callable>() { + @Override public List call() throws Exception { + return generateSplitsInternal(jobConf, qplan, splits, query, + scans, splitByStats, connection, regionLocator, + tablePaths); + } + }); + tasks.add(task); + } + for (Future> task : tasks) { + psplits.addAll(task.get()); + } + } catch (ExecutionException | InterruptedException exception) { + throw new IOException("failed to get splits,reason:", exception); + } finally { + executorService.shutdown(); } - for (Future> task : tasks) { - psplits.addAll(task.get()); + } else { + LOG.info("generate splits in serial"); + for (final List scans : qplan.getScans()) { + psplits.addAll(generateSplitsInternal(jobConf, qplan, splits, query, scans, + splitByStats, connection, regionLocator, tablePaths)); } - } catch (ExecutionException | InterruptedException exception) { - throw new IOException("failed to get splits,reason:", - exception); - } finally { - executorService.shutdown(); } } + return psplits; } + + /* + * This method is used to check whether need to run in parallel to reduce + * time costs. + * @param parallelThreshold + * @param scans: number of scans + * @return true indicates should generate split in parallel. + * */ + private boolean needRunInParallel(int parallelThreshold, int scans) { + return parallelThreshold > 0 && scans >= parallelThreshold; + } + /** * This method is used to generate splits for each scan list. * @param jobConf MapReduce Job Configuration @@ -209,109 +208,63 @@ public List call() throws Exception { * @param scans scan list slice of query plan * @param splitByStats split by stat enabled * @param tablePaths table paths + * @param connection phoenix connection + * @param regionLocator * @return List of Input Splits * @throws IOException if function fails */ - private List generateSplitsInternal(final JobConf jobConf, - final QueryPlan qplan, - final List splits, - final String query, - final List scans, - final boolean splitByStats, - final Path[] tablePaths) throws IOException { + private List generateSplitsInternal(final JobConf jobConf, final QueryPlan qplan, + final List splits, final String query, final List scans, + final boolean splitByStats, final org.apache.hadoop.hbase.client.Connection connection, + final RegionLocator regionLocator, final Path[] tablePaths) throws IOException { final List psplits = new ArrayList<>(scans.size()); - try (org.apache.hadoop.hbase.client.Connection connection = - ConnectionFactory.createConnection( - PhoenixConnectionUtil.getConfiguration(jobConf))) { - RegionLocator regionLocator = - connection.getRegionLocator(TableName.valueOf( - qplan.getTableRef().getTable() - .getPhysicalName().toString())); - PhoenixInputSplit inputSplit; - - HRegionLocation location = regionLocator.getRegionLocation( - scans.get(0).getStartRow(), - false); - long regionSize = CompatUtil.getSize(regionLocator, - connection.getAdmin(), - location); - String regionLocation = - PhoenixStorageHandlerUtil.getRegionLocation(location, - LOG); - - if (splitByStats) { - for (Scan aScan : scans) { - if (LOG.isDebugEnabled()) { - LOG.debug("Split for scan : " - + aScan - + "with scanAttribute : " - + aScan.getAttributesMap() - + " [scanCache, cacheBlock, scanBatch] : [" - + aScan.getCaching() - + ", " - + aScan.getCacheBlocks() - + ", " - + aScan.getBatch() - + "] and regionLocation : " - + regionLocation); - } - inputSplit = - new PhoenixInputSplit( - new ArrayList<>(Arrays.asList(aScan)), - tablePaths[0], - regionLocation, - regionSize); - inputSplit.setQuery(query); - psplits.add(inputSplit); - } - } else { + PhoenixInputSplit inputSplit; + + HRegionLocation location = regionLocator.getRegionLocation(scans.get(0).getStartRow(), + false); + long regionSize = CompatUtil.getSize(regionLocator, connection.getAdmin(), location); + String regionLocation = PhoenixStorageHandlerUtil.getRegionLocation(location, LOG); + + if (splitByStats) { + for (Scan aScan : scans) { if (LOG.isDebugEnabled()) { - LOG.debug("Scan count[" - + scans.size() - + "] : " - + Bytes.toStringBinary( - scans.get(0).getStartRow()) - + " ~ " - + Bytes.toStringBinary( - scans.get(scans.size() - 1).getStopRow())); - LOG.debug("First scan : " - + scans.get(0) - + "with scanAttribute : " - + scans.get(0).getAttributesMap() - + " [scanCache, cacheBlock, scanBatch] : " - + "[" + scans.get(0).getCaching() - + ", " - + scans.get(0).getCacheBlocks() - + ", " - + scans.get(0).getBatch() - + "] and regionLocation : " - + regionLocation); - - for (int i = 0, limit = scans.size(); i < limit; i++) { - LOG.debug( - "EXPECTED_UPPER_REGION_KEY[" - + i - + "] : " - + Bytes.toStringBinary( - scans.get(i).getAttribute( - BaseScannerRegionObserver - .EXPECTED_UPPER_REGION_KEY - ) - )); - } + LOG.debug("Split for scan : " + aScan + "with scanAttribute : " + + aScan.getAttributesMap() + " [scanCache, cacheBlock, scanBatch] : [" + + aScan.getCaching() + ", " + aScan.getCacheBlocks() + ", " + + aScan.getBatch() + "] and regionLocation : " + regionLocation); } - inputSplit = - new PhoenixInputSplit(scans, - tablePaths[0], - regionLocation, - regionSize); + inputSplit = new PhoenixInputSplit(new ArrayList<>(Arrays.asList(aScan)), + tablePaths[0], regionLocation, regionSize); inputSplit.setQuery(query); psplits.add(inputSplit); } + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("Scan count[" + scans.size() + "] : " + Bytes.toStringBinary(scans + .get(0).getStartRow()) + " ~ " + Bytes.toStringBinary(scans.get(scans + .size() - 1).getStopRow())); + + LOG.debug("First scan : " + scans.get(0) + "with scanAttribute : " + scans + .get(0).getAttributesMap() + " [scanCache, cacheBlock, scanBatch] : " + + "[" + scans.get(0).getCaching() + ", " + scans.get(0).getCacheBlocks() + + ", " + scans.get(0).getBatch() + "] and regionLocation : " + + regionLocation); + + for (int i = 0, limit = scans.size(); i < limit; i++) { + LOG.debug("EXPECTED_UPPER_REGION_KEY[" + i + "] : " + Bytes + .toStringBinary(scans.get(i).getAttribute + (BaseScannerRegionObserver.EXPECTED_UPPER_REGION_KEY))); + } + } + + inputSplit = new PhoenixInputSplit(scans, tablePaths[0], regionLocation, regionSize); + inputSplit.setQuery(query); + psplits.add(inputSplit); } + return psplits; } diff --git a/phoenix-hive-base/src/test/java/org/apache/phoenix/hive/HivePhoenixInputFormatTest.java b/phoenix-hive-base/src/test/java/org/apache/phoenix/hive/HivePhoenixInputFormatTest.java index 85966b20..08b80f98 100644 --- a/phoenix-hive-base/src/test/java/org/apache/phoenix/hive/HivePhoenixInputFormatTest.java +++ b/phoenix-hive-base/src/test/java/org/apache/phoenix/hive/HivePhoenixInputFormatTest.java @@ -17,6 +17,17 @@ */ package org.apache.phoenix.hive; +import java.io.IOException; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.Locale; +import java.util.Properties; + +import javax.annotation.concurrent.NotThreadSafe; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.mapred.InputSplit; @@ -33,15 +44,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.annotation.concurrent.NotThreadSafe; -import java.io.IOException; -import java.sql.Connection; -import java.sql.DriverManager; -import java.sql.PreparedStatement; -import java.sql.SQLException; -import java.sql.Statement; -import java.util.Locale; -import java.util.Properties; + /** * Test class for Hive PhoenixInputFormat @@ -49,73 +52,59 @@ @NotThreadSafe @Category(ParallelStatsDisabledTest.class) public class HivePhoenixInputFormatTest extends ParallelStatsDisabledIT { - private static final Logger LOG = LoggerFactory.getLogger( - HivePhoenixInputFormatTest.class); - private static final String TABLE_NAME = "HivePhoenixInputFormatTest" - .toUpperCase(Locale.ROOT); - private static final String DDL = "CREATE TABLE " - + TABLE_NAME + private static final Logger LOG = LoggerFactory.getLogger(HivePhoenixInputFormatTest.class); + private static final String TABLE_NAME = "HivePhoenixInputFormatTest".toUpperCase(Locale.ROOT); + private static final String DDL = "CREATE TABLE " + TABLE_NAME + " (V1 varchar NOT NULL PRIMARY KEY, V2 integer)"; private static final int SPLITS = 128; - /* - * - * This test will create phoenix table with 128 splits and compare - * performance of split generation in serial/parallel - * - * */ + // This test will create phoenix table with 128 splits and compare performance of + // serial split-generation method and parallel split-generation method. @Test - public void testGetSplitsSerialOrParallel() throws IOException,SQLException { + public void testGetSplitsSerialOrParallel() throws IOException, SQLException { PhoenixInputFormat inputFormat = new PhoenixInputFormat(); - long start,end; - + long start; + long end; // create table with N splits System.out.println( - String.format("generate testing table with %s splits", - String.valueOf(SPLITS))); + String.format("generate testing table with %s splits", String.valueOf(SPLITS))); setupTestTable(); // setup configuration required for PhoenixInputFormat Configuration conf = getUtility().getConfiguration(); JobConf jobConf = new JobConf(conf); configureTestInput(jobConf); - - + inputFormat.getSplits(jobConf, SPLITS); + InputSplit[] inputSplitsSerial; // test get splits in serial start = System.currentTimeMillis(); - jobConf.set("hive.phoenix.split.parallel.threshold","0"); - InputSplit[] inputSplitsSerial = inputFormat.getSplits(jobConf,SPLITS); + jobConf.set("hive.phoenix.split.parallel.threshold", "0"); + inputSplitsSerial = inputFormat.getSplits(jobConf, SPLITS); end = System.currentTimeMillis(); - long durationInSerial=end - start; - System.out.println(String.format( - "get split in serial requires:%s ms", + long durationInSerial = end - start; + System.out.println(String.format("get split in serial requires:%s ms", String.valueOf(durationInSerial))); // test get splits in parallel start = System.currentTimeMillis(); jobConf.set("hive.phoenix.split.parallel.threshold", "1"); - InputSplit[] inputSplitsParallel = inputFormat.getSplits( - jobConf, - SPLITS); + InputSplit[] inputSplitsParallel = inputFormat.getSplits(jobConf, SPLITS); end = System.currentTimeMillis(); - long durationInParallel=end - start; + long durationInParallel = end - start; - System.out.println(String.format( - "get split in parallel requires:%s ms", + System.out.println(String.format("get split in parallel requires:%s ms", String.valueOf(durationInParallel))); // Test if performance of parallel method is better than serial method Assert.assertTrue(durationInParallel < durationInSerial); // Test if the input split returned by serial method and parallel method are the same - Assert.assertTrue(inputSplitsParallel.length==SPLITS); - Assert.assertTrue( - inputSplitsParallel.length == inputSplitsSerial.length - ); - for (final InputSplit inputSplitParallel:inputSplitsParallel){ - boolean match=false; - for (final InputSplit inputSplitSerial:inputSplitsSerial){ - if (inputSplitParallel.equals(inputSplitSerial)){ - match=true; + Assert.assertTrue(inputSplitsParallel.length == SPLITS); + Assert.assertTrue(inputSplitsParallel.length == inputSplitsSerial.length); + for (final InputSplit inputSplitParallel : inputSplitsParallel) { + boolean match = false; + for (final InputSplit inputSplitSerial : inputSplitsSerial) { + if (inputSplitParallel.equals(inputSplitSerial)) { + match = true; break; } } @@ -123,86 +112,90 @@ public void testGetSplitsSerialOrParallel() throws IOException,SQLException { } } + // create phoenix table with 128 splits. private static void setupTestTable() throws SQLException { - final byte [] start=new byte[0]; - final byte [] end = Bytes.createMaxByteArray(4); - final byte[][] splits = Bytes.split(start, end, SPLITS-2); - createTestTableWithBinarySplit(getUrl(),DDL, splits ,null); + if (SPLITS > 1) { + final byte[] start = new byte[0]; + final byte[] end = Bytes.createMaxByteArray(4); + final byte[][] splits = Bytes.split(start, end, SPLITS - 1); + System.out.println(String.valueOf(splits.length)); + createTestTableWithBinarySplit(getUrl(), DDL, splits, null); + } else { + createTestTableWithBinarySplit(getUrl(), DDL, null, null); + } } - private static void buildPreparedSqlWithBinarySplits( - StringBuffer sb, - int splits) - { - int splitPoints = splits -1; + // build prepared sql statement with splits + private static void buildPreparedSqlWithBinarySplits(StringBuffer sb, int splits) { + int splitPoints = splits - 2; sb.append(" SPLIT ON("); sb.append("?"); for (int i = 1; i < splitPoints; i++) { sb.append(",?"); } sb.append(")"); + System.out.println(sb.toString()); } - private static PreparedStatement createPreparedStatement(Connection connection, - String newSql, - byte [][] splitsBytes) throws SQLException { - final PreparedStatement statement = (PreparedStatement) connection - .prepareStatement(newSql); - final int splitPoints = splitsBytes.length-1; + // create prepared statement with params + private static PreparedStatement createPreparedStatement(Connection connection, String newSql, + byte[][] splitsBytes) throws SQLException { + final PreparedStatement statement = (PreparedStatement) connection.prepareStatement(newSql); + final int splitPoints = splitsBytes.length - 2; for (int i = 1; i <= splitPoints; i++) { statement.setBytes(i, splitsBytes[i]); } return statement; } - protected static void createTestTableWithBinarySplit( - String url, - String ddl, - byte[][] splits, + // create phoenix test table with splits and insert some testing data + protected static void createTestTableWithBinarySplit(String url, String ddl, byte[][] splits, Long ts) throws SQLException { - Assert.assertNotNull(ddl); - StringBuffer buf = new StringBuffer(ddl); - buildPreparedSqlWithBinarySplits(buf, splits.length); - - ddl = buf.toString(); Properties props = new Properties(); if (ts != null) { props.setProperty("CurrentSCN", Long.toString(ts)); } - Connection conn = DriverManager.getConnection(url, props); try { - try(Statement stmt = conn.createStatement()) { + // drop table first + try (Statement stmt = conn.createStatement()) { stmt.execute("DROP TABLE IF EXISTS " + TABLE_NAME); } - try(PreparedStatement statement = createPreparedStatement(conn, - ddl.toString(), - splits)){ - statement.execute(); + if (SPLITS > 1) { + //create table with splits + Assert.assertNotNull(ddl); + StringBuffer buf = new StringBuffer(ddl); + buildPreparedSqlWithBinarySplits(buf, splits.length); + ddl = buf.toString(); + try (PreparedStatement statement = createPreparedStatement(conn, ddl.toString(), + splits)) { + statement.execute(); + } + } else { + // create table without splits + try (Statement statement = conn.createStatement()) { + statement.execute(ddl); + } } - try(Statement stmt = conn.createStatement()) { - stmt.execute("UPSERT INTO " + TABLE_NAME +" VALUES('1' ,1)"); + try (Statement stmt = conn.createStatement()) { + stmt.execute("UPSERT INTO " + TABLE_NAME + " VALUES('1' ,1)"); } } catch (TableAlreadyExistsException var12) { - throw var12; + throw var12; } finally { conn.close(); } } - protected static void configureTestInput(JobConf jobConf){ - jobConf.set( - PhoenixStorageHandlerConstants.PHOENIX_TABLE_NAME, - TABLE_NAME); + // setup jobConf used for testing + protected static void configureTestInput(JobConf jobConf) { + jobConf.set(PhoenixStorageHandlerConstants.PHOENIX_TABLE_NAME, TABLE_NAME); jobConf.set("ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR", ""); - jobConf.set("PhoenixStorageHandlerConstants.PHOENIX_COLUMN_MAPPING", - "v1:V1,v2:V2"); - jobConf.set("phoenix.zookeeper.quorum","localhost"); - jobConf.set("phoenix.zookeeper.client.port", - String.valueOf(getZKClientPort(jobConf))); - jobConf.set("mapreduce.input.fileinputformat.inputdir", - "/tmp"); + jobConf.set("PhoenixStorageHandlerConstants.PHOENIX_COLUMN_MAPPING", "v1:V1,v2:V2"); + jobConf.set("phoenix.zookeeper.quorum", "localhost"); + jobConf.set("phoenix.zookeeper.client.port", String.valueOf(getZKClientPort(jobConf))); + jobConf.set("mapreduce.input.fileinputformat.inputdir", "/tmp"); } } From cf8e003f59d89d2aed66dd765215e4bd7c9e5859 Mon Sep 17 00:00:00 2001 From: jichen Date: Mon, 9 May 2022 22:25:16 +0800 Subject: [PATCH 05/13] PHOENIX-6698 hive-connector will take long time to generate splits for large phoenix tables. --- .../org/apache/phoenix/hive/mapreduce/PhoenixInputFormat.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/phoenix-hive-base/src/main/java/org/apache/phoenix/hive/mapreduce/PhoenixInputFormat.java b/phoenix-hive-base/src/main/java/org/apache/phoenix/hive/mapreduce/PhoenixInputFormat.java index fe66d328..895208bb 100644 --- a/phoenix-hive-base/src/main/java/org/apache/phoenix/hive/mapreduce/PhoenixInputFormat.java +++ b/phoenix-hive-base/src/main/java/org/apache/phoenix/hive/mapreduce/PhoenixInputFormat.java @@ -172,7 +172,7 @@ private List generateSplits(final JobConf jobConf, final QueryPlan q psplits.addAll(task.get()); } } catch (ExecutionException | InterruptedException exception) { - throw new IOException("failed to get splits,reason:", exception); + throw new IOException("failed to generate splits, reason:", exception); } finally { executorService.shutdown(); } From 5c078a4b1efe5469a727f793ca740e9b2d6e2992 Mon Sep 17 00:00:00 2001 From: jichen Date: Mon, 9 May 2022 22:25:16 +0800 Subject: [PATCH 06/13] PHOENIX-6698 hive-connector will take long time to generate splits for large phoenix tables. --- .../hive/mapreduce/PhoenixInputFormat.java | 32 ++++++++----------- 1 file changed, 13 insertions(+), 19 deletions(-) diff --git a/phoenix-hive-base/src/main/java/org/apache/phoenix/hive/mapreduce/PhoenixInputFormat.java b/phoenix-hive-base/src/main/java/org/apache/phoenix/hive/mapreduce/PhoenixInputFormat.java index 895208bb..538823be 100644 --- a/phoenix-hive-base/src/main/java/org/apache/phoenix/hive/mapreduce/PhoenixInputFormat.java +++ b/phoenix-hive-base/src/main/java/org/apache/phoenix/hive/mapreduce/PhoenixInputFormat.java @@ -161,9 +161,8 @@ private List generateSplits(final JobConf jobConf, final QueryPlan q Future> task = executorService.submit( new Callable>() { @Override public List call() throws Exception { - return generateSplitsInternal(jobConf, qplan, splits, query, - scans, splitByStats, connection, regionLocator, - tablePaths); + return generateSplitsInternal(query, scans, splitByStats, + connection, regionLocator, tablePaths); } }); tasks.add(task); @@ -179,7 +178,7 @@ private List generateSplits(final JobConf jobConf, final QueryPlan q } else { LOG.info("generate splits in serial"); for (final List scans : qplan.getScans()) { - psplits.addAll(generateSplitsInternal(jobConf, qplan, splits, query, scans, + psplits.addAll(generateSplitsInternal(query, scans, splitByStats, connection, regionLocator, tablePaths)); } } @@ -188,33 +187,28 @@ private List generateSplits(final JobConf jobConf, final QueryPlan q return psplits; } - /* - * This method is used to check whether need to run in parallel to reduce - * time costs. - * @param parallelThreshold - * @param scans: number of scans - * @return true indicates should generate split in parallel. - * */ - private boolean needRunInParallel(int parallelThreshold, int scans) { + /** + * This method is used to check whether need to run in parallel to reduce time costs. + * @param parallelThreshold parameter parallelThreshold + * @param scans number of scans + * @return true indicates should generate split in parallel. + */ + private boolean needRunInParallel(final int parallelThreshold, final int scans) { return parallelThreshold > 0 && scans >= parallelThreshold; } /** * This method is used to generate splits for each scan list. - * @param jobConf MapReduce Job Configuration - * @param qplan phoenix query plan - * @param splits phoenix table splits * @param query phoenix query statement * @param scans scan list slice of query plan * @param splitByStats split by stat enabled - * @param tablePaths table paths * @param connection phoenix connection - * @param regionLocator + * @param regionLocator Hbase Region Locator + * @param tablePaths table paths * @return List of Input Splits * @throws IOException if function fails */ - private List generateSplitsInternal(final JobConf jobConf, final QueryPlan qplan, - final List splits, final String query, final List scans, + private List generateSplitsInternal(final String query, final List scans, final boolean splitByStats, final org.apache.hadoop.hbase.client.Connection connection, final RegionLocator regionLocator, final Path[] tablePaths) throws IOException { From d02d878657a659880da86b9ff9e75778de7c4bb5 Mon Sep 17 00:00:00 2001 From: jichen Date: Mon, 9 May 2022 22:25:16 +0800 Subject: [PATCH 07/13] PHOENIX-6698 hive-connector will take long time to generate splits for large phoenix tables. --- .../PhoenixStorageHandlerConstants.java | 19 ++++++++++++++++ .../hive/mapreduce/PhoenixInputFormat.java | 22 +++++++++++-------- 2 files changed, 32 insertions(+), 9 deletions(-) diff --git a/phoenix-hive-base/src/main/java/org/apache/phoenix/hive/constants/PhoenixStorageHandlerConstants.java b/phoenix-hive-base/src/main/java/org/apache/phoenix/hive/constants/PhoenixStorageHandlerConstants.java index c78ce1af..0481b572 100644 --- a/phoenix-hive-base/src/main/java/org/apache/phoenix/hive/constants/PhoenixStorageHandlerConstants.java +++ b/phoenix-hive-base/src/main/java/org/apache/phoenix/hive/constants/PhoenixStorageHandlerConstants.java @@ -105,4 +105,23 @@ public class PhoenixStorageHandlerConstants { FUNCTION_VALUE_MARKER; public static final IntWritable INT_ZERO = new IntWritable(0); + + // Minimum number of parallel scans(Nps) threshold (Nt) required to trigger parallel split + // generation method (PSGM),instead of serial split generation method (SSGM). + // According to test, SSGM is better when Nps is less than Nt, when Nps is larger than Nt, + // PSGM will be better. + // Note: It is strongly recommend to leaf the setting with default,tuning the value doesn't + // make much difference. + public static final String PHOENIX_MINIMUM_PARALLEL_SCANS_THRESHOLD = + "phoenix.minimum.parallel.scans.threshold"; + public static final int DEFAULT_PHOENIX_MINIMUM_PARALLEL_SCANS_THRESHOLD = 8; + + // Number of worker threads used to generate input splits using PSGM. + // Note: default setting is suitable for most use cases, + // you can set it to bigger value for seek better performance. + public static final String PHOENIX_INPUTSPLIT_GENERATION_THREAD_COUNT = + "phoenix.inputsplit.generation.thread.count"; + public static final int DEFAULT_PHOENIX_INPUTSPLIT_GENERATION_THREAD_COUNT = + Runtime.getRuntime().availableProcessors() * 2; + } diff --git a/phoenix-hive-base/src/main/java/org/apache/phoenix/hive/mapreduce/PhoenixInputFormat.java b/phoenix-hive-base/src/main/java/org/apache/phoenix/hive/mapreduce/PhoenixInputFormat.java index 538823be..3a914399 100644 --- a/phoenix-hive-base/src/main/java/org/apache/phoenix/hive/mapreduce/PhoenixInputFormat.java +++ b/phoenix-hive-base/src/main/java/org/apache/phoenix/hive/mapreduce/PhoenixInputFormat.java @@ -140,19 +140,22 @@ private List generateSplits(final JobConf jobConf, final QueryPlan q final boolean splitByStats = jobConf.getBoolean( PhoenixStorageHandlerConstants.SPLIT_BY_STATS, false); - final int parallelThreshold = jobConf.getInt("hive.phoenix.split.parallel.threshold", - 8); + final int parallelThreshold = jobConf.getInt( + PhoenixStorageHandlerConstants.PHOENIX_MINIMUM_PARALLEL_SCANS_THRESHOLD, + PhoenixStorageHandlerConstants.DEFAULT_PHOENIX_MINIMUM_PARALLEL_SCANS_THRESHOLD); setScanCacheSize(jobConf); try (org.apache.hadoop.hbase.client.Connection connection = ConnectionFactory .createConnection(PhoenixConnectionUtil.getConfiguration(jobConf))) { final RegionLocator regionLocator = connection.getRegionLocator(TableName.valueOf( qplan.getTableRef().getTable().getPhysicalName().toString())); final int scanSize = qplan.getScans().size(); - if (needRunInParallel(parallelThreshold, scanSize)) { - final int parallism = jobConf.getInt("hive.phoenix.split.parallel.level", - Runtime.getRuntime().availableProcessors() * 2); + if (useParallelInputGeneration(parallelThreshold, scanSize)) { + final int parallism = jobConf.getInt( + PhoenixStorageHandlerConstants.PHOENIX_INPUTSPLIT_GENERATION_THREAD_COUNT, + PhoenixStorageHandlerConstants + .DEFAULT_PHOENIX_INPUTSPLIT_GENERATION_THREAD_COUNT); ExecutorService executorService = Executors.newFixedThreadPool(parallism); - LOG.info("generate splits in parallel with {} threads", parallism); + LOG.info("Generate Input Splits in Parallel with {} threads", parallism); List>> tasks = new ArrayList<>(); @@ -171,12 +174,13 @@ private List generateSplits(final JobConf jobConf, final QueryPlan q psplits.addAll(task.get()); } } catch (ExecutionException | InterruptedException exception) { - throw new IOException("failed to generate splits, reason:", exception); + throw new IOException("Failed to Generate Input Splits in Parallel, reason:", + exception); } finally { executorService.shutdown(); } } else { - LOG.info("generate splits in serial"); + LOG.info("Generate Input Splits in Serial"); for (final List scans : qplan.getScans()) { psplits.addAll(generateSplitsInternal(query, scans, splitByStats, connection, regionLocator, tablePaths)); @@ -193,7 +197,7 @@ private List generateSplits(final JobConf jobConf, final QueryPlan q * @param scans number of scans * @return true indicates should generate split in parallel. */ - private boolean needRunInParallel(final int parallelThreshold, final int scans) { + private boolean useParallelInputGeneration(final int parallelThreshold, final int scans) { return parallelThreshold > 0 && scans >= parallelThreshold; } From a80749f0fd790c3de4ae214452b227e14574e2c6 Mon Sep 17 00:00:00 2001 From: jichen Date: Mon, 9 May 2022 22:25:16 +0800 Subject: [PATCH 08/13] PHOENIX-6698 hive-connector will take long time to generate splits for large phoenix tables. --- .../PhoenixStorageHandlerConstants.java | 7 ++++--- .../hive/HivePhoenixInputFormatTest.java | 19 ++++++++++--------- 2 files changed, 14 insertions(+), 12 deletions(-) diff --git a/phoenix-hive-base/src/main/java/org/apache/phoenix/hive/constants/PhoenixStorageHandlerConstants.java b/phoenix-hive-base/src/main/java/org/apache/phoenix/hive/constants/PhoenixStorageHandlerConstants.java index 0481b572..c3763eca 100644 --- a/phoenix-hive-base/src/main/java/org/apache/phoenix/hive/constants/PhoenixStorageHandlerConstants.java +++ b/phoenix-hive-base/src/main/java/org/apache/phoenix/hive/constants/PhoenixStorageHandlerConstants.java @@ -110,15 +110,16 @@ public class PhoenixStorageHandlerConstants { // generation method (PSGM),instead of serial split generation method (SSGM). // According to test, SSGM is better when Nps is less than Nt, when Nps is larger than Nt, // PSGM will be better. - // Note: It is strongly recommend to leaf the setting with default,tuning the value doesn't - // make much difference. + // Note: It is strongly recommend to leave the setting as default,tuning the value doesn't + // make much difference.If you persist to use legeacy method(SSGM),set + // phoenix.minimum.parallel.scans.threshold = 0 . public static final String PHOENIX_MINIMUM_PARALLEL_SCANS_THRESHOLD = "phoenix.minimum.parallel.scans.threshold"; public static final int DEFAULT_PHOENIX_MINIMUM_PARALLEL_SCANS_THRESHOLD = 8; // Number of worker threads used to generate input splits using PSGM. // Note: default setting is suitable for most use cases, - // you can set it to bigger value for seek better performance. + // you can set it to bigger value properly to get better performance. public static final String PHOENIX_INPUTSPLIT_GENERATION_THREAD_COUNT = "phoenix.inputsplit.generation.thread.count"; public static final int DEFAULT_PHOENIX_INPUTSPLIT_GENERATION_THREAD_COUNT = diff --git a/phoenix-hive-base/src/test/java/org/apache/phoenix/hive/HivePhoenixInputFormatTest.java b/phoenix-hive-base/src/test/java/org/apache/phoenix/hive/HivePhoenixInputFormatTest.java index 08b80f98..ebdceafc 100644 --- a/phoenix-hive-base/src/test/java/org/apache/phoenix/hive/HivePhoenixInputFormatTest.java +++ b/phoenix-hive-base/src/test/java/org/apache/phoenix/hive/HivePhoenixInputFormatTest.java @@ -74,20 +74,13 @@ public void testGetSplitsSerialOrParallel() throws IOException, SQLException { Configuration conf = getUtility().getConfiguration(); JobConf jobConf = new JobConf(conf); configureTestInput(jobConf); + // warm up inputFormat.getSplits(jobConf, SPLITS); InputSplit[] inputSplitsSerial; - // test get splits in serial - start = System.currentTimeMillis(); - jobConf.set("hive.phoenix.split.parallel.threshold", "0"); - inputSplitsSerial = inputFormat.getSplits(jobConf, SPLITS); - end = System.currentTimeMillis(); - long durationInSerial = end - start; - System.out.println(String.format("get split in serial requires:%s ms", - String.valueOf(durationInSerial))); // test get splits in parallel start = System.currentTimeMillis(); - jobConf.set("hive.phoenix.split.parallel.threshold", "1"); + jobConf.set(PhoenixStorageHandlerConstants.PHOENIX_MINIMUM_PARALLEL_SCANS_THRESHOLD, "1"); InputSplit[] inputSplitsParallel = inputFormat.getSplits(jobConf, SPLITS); end = System.currentTimeMillis(); long durationInParallel = end - start; @@ -95,6 +88,14 @@ public void testGetSplitsSerialOrParallel() throws IOException, SQLException { System.out.println(String.format("get split in parallel requires:%s ms", String.valueOf(durationInParallel))); + // test get splits in serial + start = System.currentTimeMillis(); + jobConf.set(PhoenixStorageHandlerConstants.PHOENIX_MINIMUM_PARALLEL_SCANS_THRESHOLD, "0"); + inputSplitsSerial = inputFormat.getSplits(jobConf, SPLITS); + end = System.currentTimeMillis(); + long durationInSerial = end - start; + System.out.println(String.format("get split in serial requires:%s ms", + String.valueOf(durationInSerial))); // Test if performance of parallel method is better than serial method Assert.assertTrue(durationInParallel < durationInSerial); // Test if the input split returned by serial method and parallel method are the same From ea7aadc46c5e68ef67982fa755872e9f40afcd90 Mon Sep 17 00:00:00 2001 From: jichen Date: Mon, 9 May 2022 22:25:16 +0800 Subject: [PATCH 09/13] PHOENIX-6698 hive-connector will take long time to generate splits for large phoenix tables. --- .../hive/HivePhoenixInputFormatTest.java | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/phoenix-hive-base/src/test/java/org/apache/phoenix/hive/HivePhoenixInputFormatTest.java b/phoenix-hive-base/src/test/java/org/apache/phoenix/hive/HivePhoenixInputFormatTest.java index ebdceafc..bff8c528 100644 --- a/phoenix-hive-base/src/test/java/org/apache/phoenix/hive/HivePhoenixInputFormatTest.java +++ b/phoenix-hive-base/src/test/java/org/apache/phoenix/hive/HivePhoenixInputFormatTest.java @@ -74,9 +74,16 @@ public void testGetSplitsSerialOrParallel() throws IOException, SQLException { Configuration conf = getUtility().getConfiguration(); JobConf jobConf = new JobConf(conf); configureTestInput(jobConf); - // warm up inputFormat.getSplits(jobConf, SPLITS); InputSplit[] inputSplitsSerial; + // test get splits in serial + start = System.currentTimeMillis(); + jobConf.set(PhoenixStorageHandlerConstants.PHOENIX_MINIMUM_PARALLEL_SCANS_THRESHOLD, "0"); + inputSplitsSerial = inputFormat.getSplits(jobConf, SPLITS); + end = System.currentTimeMillis(); + long durationInSerial = end - start; + System.out.println(String.format("get split in serial requires:%s ms", + String.valueOf(durationInSerial))); // test get splits in parallel start = System.currentTimeMillis(); @@ -88,14 +95,6 @@ public void testGetSplitsSerialOrParallel() throws IOException, SQLException { System.out.println(String.format("get split in parallel requires:%s ms", String.valueOf(durationInParallel))); - // test get splits in serial - start = System.currentTimeMillis(); - jobConf.set(PhoenixStorageHandlerConstants.PHOENIX_MINIMUM_PARALLEL_SCANS_THRESHOLD, "0"); - inputSplitsSerial = inputFormat.getSplits(jobConf, SPLITS); - end = System.currentTimeMillis(); - long durationInSerial = end - start; - System.out.println(String.format("get split in serial requires:%s ms", - String.valueOf(durationInSerial))); // Test if performance of parallel method is better than serial method Assert.assertTrue(durationInParallel < durationInSerial); // Test if the input split returned by serial method and parallel method are the same From 86a4a35c96ff2afdc5cda0662761b975a0f7f8f7 Mon Sep 17 00:00:00 2001 From: jichen Date: Mon, 9 May 2022 22:25:16 +0800 Subject: [PATCH 10/13] PHOENIX-6698 hive-connector will take long time to generate splits for large phoenix tables. --- .../PhoenixStorageHandlerConstants.java | 24 +++++++++++-------- 1 file changed, 14 insertions(+), 10 deletions(-) diff --git a/phoenix-hive-base/src/main/java/org/apache/phoenix/hive/constants/PhoenixStorageHandlerConstants.java b/phoenix-hive-base/src/main/java/org/apache/phoenix/hive/constants/PhoenixStorageHandlerConstants.java index c3763eca..8148ad4f 100644 --- a/phoenix-hive-base/src/main/java/org/apache/phoenix/hive/constants/PhoenixStorageHandlerConstants.java +++ b/phoenix-hive-base/src/main/java/org/apache/phoenix/hive/constants/PhoenixStorageHandlerConstants.java @@ -106,20 +106,24 @@ public class PhoenixStorageHandlerConstants { public static final IntWritable INT_ZERO = new IntWritable(0); - // Minimum number of parallel scans(Nps) threshold (Nt) required to trigger parallel split - // generation method (PSGM),instead of serial split generation method (SSGM). - // According to test, SSGM is better when Nps is less than Nt, when Nps is larger than Nt, - // PSGM will be better. - // Note: It is strongly recommend to leave the setting as default,tuning the value doesn't - // make much difference.If you persist to use legeacy method(SSGM),set - // phoenix.minimum.parallel.scans.threshold = 0 . + /** + * Minimum number of parallel scans(Nps) threshold (Nt) required to trigger parallel split + * generation method (PSGM),instead of serial split generation method (SSGM). + * According to test, SSGM is better when Nps is less than Nt, when Nps is larger than Nt, + * PSGM will be better. + * Note: It is strongly recommend to leave the setting as default,tuning the value doesn't + * make much difference.If you insist on using legacy method(SSGM),set + * phoenix.minimum.parallel.scans.threshold = 0 . + */ public static final String PHOENIX_MINIMUM_PARALLEL_SCANS_THRESHOLD = "phoenix.minimum.parallel.scans.threshold"; public static final int DEFAULT_PHOENIX_MINIMUM_PARALLEL_SCANS_THRESHOLD = 8; - // Number of worker threads used to generate input splits using PSGM. - // Note: default setting is suitable for most use cases, - // you can set it to bigger value properly to get better performance. + /** + * Number of worker threads used to generate input splits using PSGM. + * Note: default setting is suitable for most use cases, + * you can set it to bigger value properly to get better performance. + */ public static final String PHOENIX_INPUTSPLIT_GENERATION_THREAD_COUNT = "phoenix.inputsplit.generation.thread.count"; public static final int DEFAULT_PHOENIX_INPUTSPLIT_GENERATION_THREAD_COUNT = From 9055eee77d13cafd001f80ab714d2844869913f1 Mon Sep 17 00:00:00 2001 From: jichen Date: Mon, 9 May 2022 22:25:16 +0800 Subject: [PATCH 11/13] PHOENIX-6698 hive-connector will take long time to generate splits for large phoenix tables. --- .../hive/constants/PhoenixStorageHandlerConstants.java | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/phoenix-hive-base/src/main/java/org/apache/phoenix/hive/constants/PhoenixStorageHandlerConstants.java b/phoenix-hive-base/src/main/java/org/apache/phoenix/hive/constants/PhoenixStorageHandlerConstants.java index 8148ad4f..7408be4a 100644 --- a/phoenix-hive-base/src/main/java/org/apache/phoenix/hive/constants/PhoenixStorageHandlerConstants.java +++ b/phoenix-hive-base/src/main/java/org/apache/phoenix/hive/constants/PhoenixStorageHandlerConstants.java @@ -117,6 +117,9 @@ public class PhoenixStorageHandlerConstants { */ public static final String PHOENIX_MINIMUM_PARALLEL_SCANS_THRESHOLD = "phoenix.minimum.parallel.scans.threshold"; + /** + * Default minimum number of parallel scans threshold,value is acquired by local testing. + */ public static final int DEFAULT_PHOENIX_MINIMUM_PARALLEL_SCANS_THRESHOLD = 8; /** @@ -126,6 +129,9 @@ public class PhoenixStorageHandlerConstants { */ public static final String PHOENIX_INPUTSPLIT_GENERATION_THREAD_COUNT = "phoenix.inputsplit.generation.thread.count"; + /** + * Default worker threads used to generate input splits using PSGM. + */ public static final int DEFAULT_PHOENIX_INPUTSPLIT_GENERATION_THREAD_COUNT = Runtime.getRuntime().availableProcessors() * 2; From 2005a3b952fa46a63e463859d149c96be9c84e7b Mon Sep 17 00:00:00 2001 From: jichen Date: Mon, 9 May 2022 22:25:16 +0800 Subject: [PATCH 12/13] PHOENIX-6698 hive-connector will take long time to generate splits for large phoenix tables. --- .../org/apache/phoenix/hive/HivePhoenixInputFormatTest.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/phoenix-hive-base/src/test/java/org/apache/phoenix/hive/HivePhoenixInputFormatTest.java b/phoenix-hive-base/src/test/java/org/apache/phoenix/hive/HivePhoenixInputFormatTest.java index bff8c528..5f73b91f 100644 --- a/phoenix-hive-base/src/test/java/org/apache/phoenix/hive/HivePhoenixInputFormatTest.java +++ b/phoenix-hive-base/src/test/java/org/apache/phoenix/hive/HivePhoenixInputFormatTest.java @@ -56,7 +56,7 @@ public class HivePhoenixInputFormatTest extends ParallelStatsDisabledIT { private static final String TABLE_NAME = "HivePhoenixInputFormatTest".toUpperCase(Locale.ROOT); private static final String DDL = "CREATE TABLE " + TABLE_NAME + " (V1 varchar NOT NULL PRIMARY KEY, V2 integer)"; - private static final int SPLITS = 128; + private static final int SPLITS = 256; // This test will create phoenix table with 128 splits and compare performance of // serial split-generation method and parallel split-generation method. @@ -88,6 +88,7 @@ public void testGetSplitsSerialOrParallel() throws IOException, SQLException { // test get splits in parallel start = System.currentTimeMillis(); jobConf.set(PhoenixStorageHandlerConstants.PHOENIX_MINIMUM_PARALLEL_SCANS_THRESHOLD, "1"); + jobConf.set(PhoenixStorageHandlerConstants.PHOENIX_INPUTSPLIT_GENERATION_THREAD_COUNT,"24"); InputSplit[] inputSplitsParallel = inputFormat.getSplits(jobConf, SPLITS); end = System.currentTimeMillis(); long durationInParallel = end - start; From 64fd5a1c822bcf7581235b645265188ba22ee30d Mon Sep 17 00:00:00 2001 From: jichen Date: Mon, 9 May 2022 22:25:16 +0800 Subject: [PATCH 13/13] PHOENIX-6698 hive-connector will take long time to generate splits for large phoenix tables. --- .../hive/mapreduce/PhoenixInputFormat.java | 19 +- .../hive/HivePhoenixInputFormatTest.java | 389 +++++++++++++----- 2 files changed, 293 insertions(+), 115 deletions(-) diff --git a/phoenix-hive-base/src/main/java/org/apache/phoenix/hive/mapreduce/PhoenixInputFormat.java b/phoenix-hive-base/src/main/java/org/apache/phoenix/hive/mapreduce/PhoenixInputFormat.java index 3a914399..9e463b9b 100644 --- a/phoenix-hive-base/src/main/java/org/apache/phoenix/hive/mapreduce/PhoenixInputFormat.java +++ b/phoenix-hive-base/src/main/java/org/apache/phoenix/hive/mapreduce/PhoenixInputFormat.java @@ -150,13 +150,12 @@ private List generateSplits(final JobConf jobConf, final QueryPlan q qplan.getTableRef().getTable().getPhysicalName().toString())); final int scanSize = qplan.getScans().size(); if (useParallelInputGeneration(parallelThreshold, scanSize)) { - final int parallism = jobConf.getInt( + final int parallelism = jobConf.getInt( PhoenixStorageHandlerConstants.PHOENIX_INPUTSPLIT_GENERATION_THREAD_COUNT, PhoenixStorageHandlerConstants .DEFAULT_PHOENIX_INPUTSPLIT_GENERATION_THREAD_COUNT); - ExecutorService executorService = Executors.newFixedThreadPool(parallism); - LOG.info("Generate Input Splits in Parallel with {} threads", parallism); - + ExecutorService executorService = Executors.newFixedThreadPool(parallelism); + LOG.info("Generating Input Splits in Parallel with {} threads", parallelism); List>> tasks = new ArrayList<>(); try { @@ -173,14 +172,18 @@ private List generateSplits(final JobConf jobConf, final QueryPlan q for (Future> task : tasks) { psplits.addAll(task.get()); } - } catch (ExecutionException | InterruptedException exception) { - throw new IOException("Failed to Generate Input Splits in Parallel, reason:", - exception); + } catch (ExecutionException|InterruptedException e) { + Throwable throwable = e.getCause(); + if (throwable instanceof IOException) { + throw (IOException) throwable; + } else { + throw new IOException(e); + } } finally { executorService.shutdown(); } } else { - LOG.info("Generate Input Splits in Serial"); + LOG.info("Generating Input Splits in Serial"); for (final List scans : qplan.getScans()) { psplits.addAll(generateSplitsInternal(query, scans, splitByStats, connection, regionLocator, tablePaths)); diff --git a/phoenix-hive-base/src/test/java/org/apache/phoenix/hive/HivePhoenixInputFormatTest.java b/phoenix-hive-base/src/test/java/org/apache/phoenix/hive/HivePhoenixInputFormatTest.java index 5f73b91f..8f7f5360 100644 --- a/phoenix-hive-base/src/test/java/org/apache/phoenix/hive/HivePhoenixInputFormatTest.java +++ b/phoenix-hive-base/src/test/java/org/apache/phoenix/hive/HivePhoenixInputFormatTest.java @@ -19,28 +19,27 @@ import java.io.IOException; import java.sql.Connection; +import java.sql.Date; import java.sql.DriverManager; import java.sql.PreparedStatement; import java.sql.SQLException; import java.sql.Statement; -import java.util.Locale; import java.util.Properties; +import java.util.UUID; +import java.util.regex.Matcher; +import java.util.regex.Pattern; import javax.annotation.concurrent.NotThreadSafe; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobConf; -import org.apache.phoenix.end2end.ParallelStatsDisabledIT; -import org.apache.phoenix.end2end.ParallelStatsDisabledTest; +import org.apache.phoenix.end2end.ParallelStatsEnabledIT; import org.apache.phoenix.hive.constants.PhoenixStorageHandlerConstants; import org.apache.phoenix.hive.mapreduce.PhoenixInputFormat; import org.apache.phoenix.mapreduce.PhoenixRecordWritable; -import org.apache.phoenix.schema.TableAlreadyExistsException; import org.junit.Assert; import org.junit.Test; -import org.junit.experimental.categories.Category; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -50,56 +49,108 @@ * Test class for Hive PhoenixInputFormat */ @NotThreadSafe -@Category(ParallelStatsDisabledTest.class) -public class HivePhoenixInputFormatTest extends ParallelStatsDisabledIT { +public class HivePhoenixInputFormatTest extends ParallelStatsEnabledIT { private static final Logger LOG = LoggerFactory.getLogger(HivePhoenixInputFormatTest.class); - private static final String TABLE_NAME = "HivePhoenixInputFormatTest".toUpperCase(Locale.ROOT); - private static final String DDL = "CREATE TABLE " + TABLE_NAME - + " (V1 varchar NOT NULL PRIMARY KEY, V2 integer)"; - private static final int SPLITS = 256; + /** + * base table + */ + private static final String BASE_TEST_TABLE = "ENTITY_HISTORY"; - // This test will create phoenix table with 128 splits and compare performance of - // serial split-generation method and parallel split-generation method. + /** + * Internal test table without salting + */ + private static final String INTERNAL_TEST_TABLE = "ENTITY_HISTORY"; + /** + * Internal salted test table + */ + private static final String INTERNAL_TEST_TABLE_SALTED = "ENTITY_HISTORY_SALTED"; + /** + * Regex Pattern for custom table,the table name pattern is + * ENTITY_HISTORY_[N_BUCKETS]_[N_GUIDEPOST]_[STAT_SWITCH] + * and will create the table with table option: + * SALT_BUCKETS=,GUIDE_POSTS_WIDTH + * if STAT_SWITCH=0,will disable ParallelStats for the table + */ + private static final Pattern CUSTOM_TABLE_PATTERN = + Pattern.compile("ENTITY_HISTORY_(\\d+_\\d+_\\d+)"); + + /** + * Internal test tables name and custom test tables for testing + */ + private static final String[] TEST_TABLES = new String[] { + // internal test table + INTERNAL_TEST_TABLE, + // internal salted table + INTERNAL_TEST_TABLE_SALTED, + // test table with 128 salt bucket, 128 guidepost width, parallel stat enabled + "ENTITY_HISTORY_128_128_1", + // test table with 128 salt bucket, 128 guidepost width, parallel stat disabled + "ENTITY_HISTORY_128_128_0", + // test table with 0 salt bucket, 128 guidepost width, parallel stat enabled + "ENTITY_HISTORY_0_128_1", + // test table with 0 salt bucket, 128 guidepost width, parallel stat disabled + "ENTITY_HISTORY_0_128_0", + // test table with 128 salt bucket, 1 guidepost width, parallel stat enabled + "ENTITY_HISTORY_128_1_1", + // test table with 128 salt bucket, 1 guidepost width, parallel stat disabled + "ENTITY_HISTORY_128_1_0" + }; + + /** + * This test will perform test for phoenix tables + * with different combinations of buckets,guideposts,and parallel stat switch + */ @Test - public void testGetSplitsSerialOrParallel() throws IOException, SQLException { - PhoenixInputFormat inputFormat = + public void testGetSplitWithMultiSplitsGuidePost() throws SQLException, IOException { + for (String table: TEST_TABLES) { + testTable(table); + } + } + + /** + * This test will perform test for phoenix tables + * with different combinations of buckets,guideposts,and parallel stat switch + */ + private static void testTable(String testTableName) throws SQLException, IOException { + TableCreationInfo tableCreationInfo = getTableCreationInfo(testTableName); + createTestTable(tableCreationInfo); + fillTestData(tableCreationInfo, 10, 100); + try { + Configuration conf = getUtility().getConfiguration(); + JobConf jobConf = new JobConf(conf); + configureTestInput(jobConf, tableCreationInfo); + assertSameResult(jobConf, tableCreationInfo); + } finally { + dropTestTable(tableCreationInfo); + } + } + + /** + * The function will compare the getSplits result returned by serial method and + * parallel method and assume they are equal. + */ + private static void assertSameResult(JobConf jobConf, TableCreationInfo creationInfo) + throws IOException { + final PhoenixInputFormat inputFormat = new PhoenixInputFormat(); - long start; - long end; - // create table with N splits - System.out.println( - String.format("generate testing table with %s splits", String.valueOf(SPLITS))); - setupTestTable(); - // setup configuration required for PhoenixInputFormat - Configuration conf = getUtility().getConfiguration(); - JobConf jobConf = new JobConf(conf); - configureTestInput(jobConf); - inputFormat.getSplits(jobConf, SPLITS); + final int splits = creationInfo.getBuckets() > 0 ? creationInfo.getBuckets() : 1; InputSplit[] inputSplitsSerial; - // test get splits in serial - start = System.currentTimeMillis(); - jobConf.set(PhoenixStorageHandlerConstants.PHOENIX_MINIMUM_PARALLEL_SCANS_THRESHOLD, "0"); - inputSplitsSerial = inputFormat.getSplits(jobConf, SPLITS); - end = System.currentTimeMillis(); - long durationInSerial = end - start; - System.out.println(String.format("get split in serial requires:%s ms", - String.valueOf(durationInSerial))); + InputSplit[] inputSplitsParallel; // test get splits in parallel - start = System.currentTimeMillis(); jobConf.set(PhoenixStorageHandlerConstants.PHOENIX_MINIMUM_PARALLEL_SCANS_THRESHOLD, "1"); - jobConf.set(PhoenixStorageHandlerConstants.PHOENIX_INPUTSPLIT_GENERATION_THREAD_COUNT,"24"); - InputSplit[] inputSplitsParallel = inputFormat.getSplits(jobConf, SPLITS); - end = System.currentTimeMillis(); - long durationInParallel = end - start; - - System.out.println(String.format("get split in parallel requires:%s ms", - String.valueOf(durationInParallel))); - - // Test if performance of parallel method is better than serial method - Assert.assertTrue(durationInParallel < durationInSerial); + jobConf.set(PhoenixStorageHandlerConstants.PHOENIX_INPUTSPLIT_GENERATION_THREAD_COUNT, + "24"); + inputSplitsParallel = inputFormat.getSplits(jobConf, splits); + // test get splits in serial + jobConf.set(PhoenixStorageHandlerConstants.PHOENIX_MINIMUM_PARALLEL_SCANS_THRESHOLD, "0"); + inputSplitsSerial = inputFormat.getSplits(jobConf, splits); + System.out.println("table:" + + creationInfo.getTableName() + + "splits:" + + inputSplitsSerial.length + + "\n"); // Test if the input split returned by serial method and parallel method are the same - Assert.assertTrue(inputSplitsParallel.length == SPLITS); Assert.assertTrue(inputSplitsParallel.length == inputSplitsSerial.length); for (final InputSplit inputSplitParallel : inputSplitsParallel) { boolean match = false; @@ -113,90 +164,214 @@ public void testGetSplitsSerialOrParallel() throws IOException, SQLException { } } - // create phoenix table with 128 splits. - private static void setupTestTable() throws SQLException { - if (SPLITS > 1) { - final byte[] start = new byte[0]; - final byte[] end = Bytes.createMaxByteArray(4); - final byte[][] splits = Bytes.split(start, end, SPLITS - 1); - System.out.println(String.valueOf(splits.length)); - createTestTableWithBinarySplit(getUrl(), DDL, splits, null); + /** + * The function will create internal/custom table + * for testing. + */ + private static void createTestTable(TableCreationInfo creationInfo) throws SQLException { + if (creationInfo.isInternalTable()) { + ensureTableCreated(getUrl(), creationInfo.getTableName()); } else { - createTestTableWithBinarySplit(getUrl(), DDL, null, null); + createCustomTable(creationInfo.getTableName(), creationInfo.getBuckets(), + creationInfo.getGuidePost()); } } - - // build prepared sql statement with splits - private static void buildPreparedSqlWithBinarySplits(StringBuffer sb, int splits) { - int splitPoints = splits - 2; - sb.append(" SPLIT ON("); - sb.append("?"); - for (int i = 1; i < splitPoints; i++) { - sb.append(",?"); + /** + * The function will remove custom table + * for testing. internal tables will not be dropped + * they will be dropped automatically after testing. + */ + private static void dropTestTable(TableCreationInfo creationInfo) throws SQLException { + if (creationInfo.isInternalTable()) { + return; } - sb.append(")"); - System.out.println(sb.toString()); - } - - // create prepared statement with params - private static PreparedStatement createPreparedStatement(Connection connection, String newSql, - byte[][] splitsBytes) throws SQLException { - final PreparedStatement statement = (PreparedStatement) connection.prepareStatement(newSql); - final int splitPoints = splitsBytes.length - 2; - for (int i = 1; i <= splitPoints; i++) { - statement.setBytes(i, splitsBytes[i]); + Properties props = new Properties(); + try (Connection conn = DriverManager.getConnection(getUrl(), props)) { + Statement statement = conn.createStatement(); + statement.execute("DROP TABLE IF EXISTS " + creationInfo.getTableName()); } - return statement; } + /** + * The function is used to fill testing data + * for testing table + */ + private static void fillTestData(TableCreationInfo creationInfo, int batch, int count) + throws SQLException { + String testTableName = creationInfo.getTableName(); - // create phoenix test table with splits and insert some testing data - protected static void createTestTableWithBinarySplit(String url, String ddl, byte[][] splits, - Long ts) throws SQLException { Properties props = new Properties(); - if (ts != null) { - props.setProperty("CurrentSCN", Long.toString(ts)); + if (creationInfo.isEnableStat()) { + props.put("phoenix.stats.guidepost.width", Long.toString(20L)); + props.put("phoenix.stats.updateFrequency", Long.toString(1L)); + props.put("phoenix.coprocessor.maxMetaDataCacheTimeToLiveMs", Long.toString(1L)); + props.put("phoenix.use.stats.parallelization", Boolean.toString(true)); } - Connection conn = DriverManager.getConnection(url, props); + Connection conn = DriverManager.getConnection(getUrl(), props); + Date date = new Date(System.currentTimeMillis()); try { - // drop table first - try (Statement stmt = conn.createStatement()) { - stmt.execute("DROP TABLE IF EXISTS " + TABLE_NAME); - } - if (SPLITS > 1) { - //create table with splits - Assert.assertNotNull(ddl); - StringBuffer buf = new StringBuffer(ddl); - buildPreparedSqlWithBinarySplits(buf, splits.length); - ddl = buf.toString(); - try (PreparedStatement statement = createPreparedStatement(conn, ddl.toString(), - splits)) { - statement.execute(); - } - } else { - // create table without splits - try (Statement statement = conn.createStatement()) { - statement.execute(ddl); + try (PreparedStatement stmt = conn.prepareStatement( + "upsert into " + + testTableName + + "( " + + "ORGANIZATION_ID, " + + "PARENT_ID, " + + "CREATED_DATE, " + + "ENTITY_HISTORY_ID, " + + "OLD_VALUE, " + + "NEW_VALUE) VALUES (?, ?, ?, ?, ?, ?)" + )) { + for (int i = 0; i < batch; i++) { + for (int j = 0; j < count; j++) { + stmt.setString(1, UUID.randomUUID().toString().substring(0, 15)); + stmt.setString(2, UUID.randomUUID().toString().substring(0, 15)); + stmt.setDate(3, date); + stmt.setString(4, UUID.randomUUID().toString().substring(0, 15)); + stmt.setString(5, UUID.randomUUID().toString().substring(0, 15)); + stmt.setString(6, UUID.randomUUID().toString().substring(0, 15)); + stmt.execute(); + } + conn.commit(); } } +//// //forcefully update statistics try (Statement stmt = conn.createStatement()) { - stmt.execute("UPSERT INTO " + TABLE_NAME + " VALUES('1' ,1)"); + final String updateStatSql = "UPDATE STATISTICS " + testTableName; + stmt.execute(updateStatSql); } - } catch (TableAlreadyExistsException var12) { - throw var12; } finally { conn.close(); } - } + /** + * @description The function is used to create custom table with table option: + * SALT_BUCKETS=<@bucket>,GUIDE_POSTS_WIDTH<@guidepost> + * @param testTableName phoenix table name to create + * @param bucket salt bucket setting + * @param guidepost guidepost setting + */ + private static void createCustomTable(String testTableName, int bucket, + int guidepost) throws SQLException { + String tableOptions = " SALT_BUCKETS=" + String.valueOf(bucket) + + " ,GUIDE_POSTS_WIDTH=" + String.valueOf(guidepost); + ensureTableCreated(getUrl(), testTableName, BASE_TEST_TABLE, null, tableOptions); + } + - // setup jobConf used for testing - protected static void configureTestInput(JobConf jobConf) { - jobConf.set(PhoenixStorageHandlerConstants.PHOENIX_TABLE_NAME, TABLE_NAME); + /** + * The function is used to setup JobConf used by HivePhoenixInputFormat + */ + protected static void configureTestInput(JobConf jobConf, TableCreationInfo creationInfo) { + jobConf.set(PhoenixStorageHandlerConstants.PHOENIX_TABLE_NAME, creationInfo.getTableName()); jobConf.set("ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR", ""); jobConf.set("PhoenixStorageHandlerConstants.PHOENIX_COLUMN_MAPPING", "v1:V1,v2:V2"); jobConf.set("phoenix.zookeeper.quorum", "localhost"); jobConf.set("phoenix.zookeeper.client.port", String.valueOf(getZKClientPort(jobConf))); + jobConf.set(PhoenixStorageHandlerConstants.SPLIT_BY_STATS, + String.valueOf(creationInfo.isEnableStat())); jobConf.set("mapreduce.input.fileinputformat.inputdir", "/tmp"); } + + /** + * The function is used to parse table creation info by using tableName + */ + public static TableCreationInfo getTableCreationInfo(String tableName) { + TableCreationInfo info = new TableCreationInfo(); + info.setTableName(tableName); + + if (tableName.equalsIgnoreCase(INTERNAL_TEST_TABLE)) { + info.setInternalTable(true); + info.setBuckets(0); + info.setGuidePost(-1); + info.setEnableStat(true); + } else if (tableName.equalsIgnoreCase(INTERNAL_TEST_TABLE_SALTED)) { + info.setInternalTable(true); + info.setBuckets(4); + info.setGuidePost(-1); + info.setEnableStat(true); + } else { + Matcher matcher = CUSTOM_TABLE_PATTERN.matcher(tableName); + if (matcher.find()) { + int bucket; + int guidepost; + int enableStat; + String group = matcher.group(1); + String[] params = group.split("_"); + bucket = Integer.parseInt(params[0]); + guidepost = Integer.parseInt(params[1]); + enableStat = Integer.parseInt(params[2]); + info.setInternalTable(false); + info.setBuckets(bucket); + info.setGuidePost(guidepost); + info.setEnableStat(enableStat > 0); + } + } + return info; + } + + /** + * Internal Pojo Class used to store settings used for table creation. + */ + private static class TableCreationInfo { + private String tableName; + private int buckets; + private int guidePost; + + private boolean enableStat; + private boolean isInternalTable; + + public String getTableName() { + return tableName; + } + + public void setTableName(String tableName) { + this.tableName = tableName; + } + + public int getBuckets() { + return buckets; + } + + public void setBuckets(int buckets) { + this.buckets = buckets; + } + + public int getGuidePost() { + return guidePost; + } + + public void setGuidePost(int guidePost) { + this.guidePost = guidePost; + } + + public boolean isEnableStat() { + return enableStat; + } + + public void setEnableStat(boolean enableStat) { + this.enableStat = enableStat; + } + + public boolean isInternalTable() { + return isInternalTable; + } + + public void setInternalTable(boolean internalTable) { + isInternalTable = internalTable; + } + + @Override + public String toString() { + StringBuffer sb = new StringBuffer(); + sb.append(tableName); + sb.append("."); + sb.append(buckets); + sb.append("."); + sb.append(guidePost); + sb.append("."); + sb.append(enableStat); + return sb.toString(); + } + } + }