diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java index 8f8befd83480..84bab95653eb 100644 --- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java +++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java @@ -1291,6 +1291,21 @@ public enum ConfVars { "metastore.partition.management.table.pattern", "*", "Automatic partition management will look for tables using the specified table pattern"), + STATISTICS_MANAGEMENT_TASK_FREQUENCY("metastore.statistics.management.task.frequency", + "metastore.statistics.management.task.frequency", + 7, TimeUnit.DAYS, "Frequency at which timer task runs to do automatic statistics \n" + + "management for tables. Statistics management include 2 configs. \n" + + "One is 'statistics.auto.deletion', and the other is 'metastore.statistics.retention.period'. \n" + + "When 'statistics.auto.deletion'='true' is set, statistics management will look for tables which their\n " + + "column statistics are over the retention period, and then delete the column stats. \n"), + STATISTICS_RETENTION_PERIOD("metastore.statistics.retention.period", + "metastore.statistics.retention.period", 365, TimeUnit.DAYS, "The retention period " + + "that we want to keep the stats for each table, which means if the stats are older than this period\n" + + "of time, the stats will be automatically deleted. \n"), + + STATISTICS_AUTO_DELETION("metastore.statistics.auto.deletion", "metastore.statistics.auto.deletion", false, + "Whether table/partition column statistics will be auto deleted after retention period"), + METASTORE_METADATA_TRANSFORMER_CLASS("metastore.metadata.transformer.class", "metastore.metadata.transformer.class", "org.apache.hadoop.hive.metastore.MetastoreDefaultTransformer", "Fully qualified class name for the metastore metadata transformer class \n" @@ -1526,7 +1541,8 @@ public enum ConfVars { ACID_METRICS_TASK_CLASS + "," + ACID_METRICS_LOGGER_CLASS + "," + "org.apache.hadoop.hive.metastore.HiveProtoEventsCleanerTask" + "," + "org.apache.hadoop.hive.metastore.ScheduledQueryExecutionsMaintTask" + "," - + "org.apache.hadoop.hive.metastore.ReplicationMetricsMaintTask", + + "org.apache.hadoop.hive.metastore.ReplicationMetricsMaintTask" + "," + + "org.apache.hadoop.hive.metastore.StatisticsManagementTask", "Comma separated list of tasks that will be started in separate threads. These will " + "always be started, regardless of whether the metastore is running in embedded mode " + "or in server mode. They must implement " + METASTORE_TASK_THREAD_CLASS), diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/StatisticsManagementTask.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/StatisticsManagementTask.java new file mode 100644 index 000000000000..077c0aa8a44e --- /dev/null +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/StatisticsManagementTask.java @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.metastore; + +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.metastore.api.DeleteColumnStatisticsRequest; +import org.apache.hadoop.hive.metastore.conf.MetastoreConf; +import org.apache.hadoop.hive.metastore.model.MTableColumnStatistics; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.jdo.PersistenceManager; +import javax.jdo.Query; + +/** + * Statistics management task is primarily responsible for auto deletion of table column stats based on a certain frequency + * + * If some table or partition column statistics are older than the configured retention interval + * (MetastoreConf.ConfVars.STATISTICS_RETENTION_PERIOD), they are deleted when this metastore task runs periodically. + */ +public class StatisticsManagementTask extends ObjectStore implements MetastoreTaskThread { + private static final Logger LOG = LoggerFactory.getLogger(StatisticsManagementTask.class); + + // The 2 configs for users to set in the conf + // this is an optional table property, if this property does not exist for a table, then it is not excluded + public static final String STATISTICS_AUTO_DELETION_EXCLUDE_TBLPROPERTY = "statistics.auto.deletion.exclude"; + + private static final Lock lock = new ReentrantLock(); + + private Configuration conf; + + @Override + public long runFrequency(TimeUnit unit) { + return MetastoreConf.getTimeVar(conf, MetastoreConf.ConfVars.STATISTICS_MANAGEMENT_TASK_FREQUENCY, unit); + } + + @Override + public void setConf(Configuration configuration) { + // we modify conf in setupConf(), so we make a copy + this.conf = new Configuration(configuration); + super.setConf(configuration); + } + + @Override + public Configuration getConf() { + return conf; + } + + // what needs to be included in this run() method: + // get the "lastAnalyzed" information from TAB_COL_STATS and find all the tables need to be deleted + // delete all column stats + @Override + public void run() { + LOG.debug("Auto statistics deletion started. Cleaning up table/partition column statistics over the retention period."); + long retentionMillis = MetastoreConf.getTimeVar(conf, MetastoreConf.ConfVars.STATISTICS_RETENTION_PERIOD, TimeUnit.MILLISECONDS); + if (retentionMillis <= 0 || !MetastoreConf.getBoolVar(conf, MetastoreConf.ConfVars.STATISTICS_AUTO_DELETION)) { + LOG.info("Statistics auto deletion is set to off currently."); + return; + } + if (!lock.tryLock()) { + return; + } + try { + long now = System.currentTimeMillis(); + long lastAnalyzedThreshold = (now - retentionMillis) / 1000; + + String filter = "lastAnalyzed < threshold"; + String paramStr = "long threshold"; + + PersistenceManager pm = getPersistenceManager(); + boolean committed = false; + openTransaction(); // open JDO transaction + try { + Query q = null; + try { + q = pm.newQuery(MTableColumnStatistics.class); + q.setFilter(filter); + q.declareParameters(paramStr); + q.setResult( + "table.database.name, " + + "table.tableName, " + + "partitionName, " + + "table.parameters.get(\"" + STATISTICS_AUTO_DELETION_EXCLUDE_TBLPROPERTY + "\")" + ); + @SuppressWarnings("unchecked") + List rows = (List) q.execute(lastAnalyzedThreshold); + + try (IMetaStoreClient msc = new HiveMetaStoreClient(conf)) { + for (Object[] row : rows) { + String dbName = (String) row[0]; + String tblName = (String) row[1]; + String partName = (String) row[2]; + String excludeVal = (String) row[3]; + + if (excludeVal != null) { + LOG.info("Skipping auto deletion of stats for table {}.{} due to STATISTICS_AUTO_DELETION_EXCLUDE_TBLPROPERTY property being set on the table.", dbName, tblName); + continue; + } + DeleteColumnStatisticsRequest request = new DeleteColumnStatisticsRequest(dbName, tblName); + request.setEngine("hive"); + request.setTableLevel(partName == null); + msc.deleteColumnStatistics(request); + } + } + } finally { + if (q != null) { + q.closeAll(); + } + } + committed = commitTransaction(); + } finally { + if (!committed) { + rollbackTransaction(); + } + } + } catch (Exception e) { + LOG.error("Error during statistics auto deletion", e); + } finally { + lock.unlock(); + } + } +} diff --git a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/TestStatisticsManagement.java b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/TestStatisticsManagement.java new file mode 100644 index 000000000000..e1ba5b1f84cb --- /dev/null +++ b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/TestStatisticsManagement.java @@ -0,0 +1,240 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.metastore; + +import static org.apache.hadoop.hive.metastore.Warehouse.DEFAULT_CATALOG_NAME; +import static org.apache.hadoop.hive.metastore.Warehouse.DEFAULT_DATABASE_NAME; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.util.List; +import java.util.concurrent.TimeUnit; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.metastore.annotation.MetastoreUnitTest; +import org.apache.hadoop.hive.metastore.api.ColumnStatistics; +import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData; +import org.apache.hadoop.hive.metastore.api.ColumnStatisticsDesc; +import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj; +import org.apache.hadoop.hive.metastore.api.Database; +import org.apache.hadoop.hive.metastore.api.DoubleColumnStatsData; +import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.client.builder.DatabaseBuilder; +import org.apache.hadoop.hive.metastore.client.builder.TableBuilder; +import org.apache.hadoop.hive.metastore.conf.MetastoreConf; +import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars; +import org.apache.hadoop.hive.metastore.security.HadoopThriftAuthBridge; +import org.apache.hadoop.hive.metastore.utils.TestTxnDbUtil; +import org.apache.thrift.TException; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import javax.jdo.PersistenceManager; +import javax.jdo.Query; + +@Category(MetastoreUnitTest.class) +public class TestStatisticsManagement { + + private IMetaStoreClient client; + private Configuration conf; + + @Before + public void setUp() throws Exception { + conf = MetastoreConf.newMetastoreConf(); + MetastoreConf.setVar(conf, ConfVars.METASTORE_METADATA_TRANSFORMER_CLASS, " "); + MetaStoreTestUtils.setConfForStandloneMode(conf); + conf.setBoolean(ConfVars.MULTITHREADED.getVarname(), false); + conf.setBoolean(ConfVars.HIVE_IN_TEST.getVarname(), true); + + // enable stats auto deletion, set up a short retention so threshold check triggers easily + MetastoreConf.setBoolVar(conf, ConfVars.STATISTICS_AUTO_DELETION, true); + MetastoreConf.setTimeVar(conf, ConfVars.STATISTICS_RETENTION_PERIOD, 1, TimeUnit.DAYS); + + MetaStoreTestUtils.startMetaStoreWithRetry(HadoopThriftAuthBridge.getBridge(), conf); + TestTxnDbUtil.setConfValues(conf); + TestTxnDbUtil.prepDb(conf); + + client = new HiveMetaStoreClient(conf); + } + + @After + public void tearDown() throws Exception { + if (client != null) { + // Drop any leftover databases, similar to TestPartitionManagement.java + List dbs = client.getAllDatabases(DEFAULT_CATALOG_NAME); + for (String db : dbs) { + if (!db.equalsIgnoreCase(DEFAULT_DATABASE_NAME)) { + client.dropDatabase(DEFAULT_CATALOG_NAME, db, true, false, true); + } + } + } + try { + if (client != null) { + client.close(); + } + } finally { + client = null; + } + } + + @Test + public void testExpiredTableColStatsAreDeleted() throws Exception { + String dbName = "stats_db1"; + String tableName = "tbl1"; + createDbAndTable(dbName, tableName, false); + // create a column stats entry (table-level) + writeTableLevelColStats(dbName, tableName, "c1"); + // ensure stats exists + assertHasTableColStats(dbName, tableName, "c1"); + // make lastAnalyzed older than the threshold + makeAllTableColStatsOlderThanRetention(dbName, tableName); + + runStatisticsManagementTask(conf); + assertNoTableColStats(dbName, tableName, "c1"); + } + + @Test + public void testExcludedTableStatsAreNotDeleted() throws Exception { + String dbName = "stats_db2"; + String tableName = "tbl2"; + // Create a database and table that ARE excluded from auto deletion. + createDbAndTable(dbName, tableName, true); + writeTableLevelColStats(dbName, tableName, "c1"); + assertHasTableColStats(dbName, tableName, "c1"); + + // Manually set lastAnalyzed to a very old timestamp so it would normally be expired. + makeAllTableColStatsOlderThanRetention(dbName, tableName); + + runStatisticsManagementTask(conf); + + // Verify that stats are still present because the table is excluded. + assertHasTableColStats(dbName, tableName, "c1"); + } + + private void createDbAndTable(String dbName, String tableName, boolean exclude) throws Exception { + Database db; + if (!DEFAULT_DATABASE_NAME.equals(dbName)) { + db = new DatabaseBuilder() + .setName(dbName) + .setCatalogName(DEFAULT_CATALOG_NAME) + .create(client, conf); + } else { + db = client.getDatabase(DEFAULT_CATALOG_NAME, DEFAULT_DATABASE_NAME); + } + + // Build a simple test table with two columns. + TableBuilder tb = new TableBuilder() + .inDb(db) + .setTableName(tableName) + .addCol("c1", "double") + .addCol("c2", "string") + .setInputFormat("org.apache.hadoop.hive.ql.io.orc.OrcInputFormat") + .setOutputFormat("org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat"); + + Table t = tb.build(conf); + + // If requested, mark this table as excluded from automatic stats deletion. + if (exclude) { + t.getParameters().put(StatisticsManagementTask.STATISTICS_AUTO_DELETION_EXCLUDE_TBLPROPERTY, "true"); + } + client.createTable(t); + client.flushCache(); + } + + private void writeTableLevelColStats(String db, String tbl, String col) throws TException { + // Create a stats object for one column. + ColumnStatisticsObj obj = new ColumnStatisticsObj(); + obj.setColName(col); + obj.setColType("double"); + + // Fill in minimal double-column statistics data. + DoubleColumnStatsData doubleData = new DoubleColumnStatsData(); + doubleData.setNumNulls(0); + doubleData.setNumDVs(1); + doubleData.setLowValue(1.0); + doubleData.setHighValue(1.0); + + ColumnStatisticsData data = new ColumnStatisticsData(); + data.setDoubleStats(doubleData); + obj.setStatsData(data); + + ColumnStatistics cs = new ColumnStatistics(); + ColumnStatisticsDesc desc = new ColumnStatisticsDesc(true, db, tbl); + desc.setCatName("hive"); + cs.setStatsDesc(desc); + cs.addToStatsObj(obj); + + client.updateTableColumnStatistics(cs); + } + + private void assertHasTableColStats(String db, String tbl, String col) throws TException { + List objs = client.getTableColumnStatistics(db, tbl, List.of(col), "hive"); + assertTrue("Expected stats for " + db + "." + tbl + "." + col, objs != null && !objs.isEmpty()); + } + + private void assertNoTableColStats(String db, String tbl, String col) throws TException { + try { + List objs = client.getTableColumnStatistics(db, tbl, List.of(col), "hive"); + assertTrue("Expected no stats for " + db + "." + tbl + "." + col, objs == null || objs.isEmpty()); + } catch (NoSuchObjectException e) { + // acceptable: server may throw if stats absent depending on impl + } + } + + private void makeAllTableColStatsOlderThanRetention(String db, String tbl) throws Exception { + // We update via ObjectStore/PM directly to avoid relying on params "lastAnalyzed". + RawStore ms = HMSHandler.getMSForConf(conf); + ObjectStore os = (ObjectStore) ms; + os.setConf(conf); + + // Compute an old timestamp in seconds, here we use 400 days ago. + long oldSeconds = (System.currentTimeMillis() - TimeUnit.DAYS.toMillis(400)) / 1000; + + // NOTE: exact JDO classes/field paths sometimes vary; adjust filter if needed based on MTableColumnStatistics mapping + PersistenceManager pm = os.getPersistenceManager(); + Query q = null; + try { + // Query MTableColumnStatistics rows for the target table. + q = pm.newQuery(org.apache.hadoop.hive.metastore.model.MTableColumnStatistics.class); + q.setFilter("table.tableName == t && table.database.name == d"); + q.declareParameters("java.lang.String t, java.lang.String d"); + @SuppressWarnings("unchecked") + List rows = + (List) q.execute(tbl, db); + // Make all matching column stats rows to look old/expired. + for (org.apache.hadoop.hive.metastore.model.MTableColumnStatistics r : rows) { + r.setLastAnalyzed(oldSeconds); + } + pm.flush(); + } finally { + if (q != null) { + q.closeAll(); + } + pm.close(); + } + } + + private void runStatisticsManagementTask(Configuration conf) { + StatisticsManagementTask task = new StatisticsManagementTask(); + task.setConf(conf); + task.run(); + } +}