diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java index 3993dc5e8dfb..0abe160094da 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java @@ -143,8 +143,8 @@ public final class DbTxnManager extends HiveTxnManagerImpl { * rollback (assuming the statement is well implemented)). * * This is done so that all commands run in a transaction which simplifies implementation and - * allows a simple implementation of multi-statement txns which don't require a lock manager - * capable of deadlock detection. (todo: not fully implemented; elaborate on how this LM works) + * allows a simple implementation of multi-statement txns. Deadlocks among waiting ACID + * txns are resolved metastore-side by {@code DeadlockDetectorService}, not by this client. * * Also, critically important, ensuring that everything runs in a transaction assigns an order * to all operations in the system - needed for replication/DR. @@ -323,10 +323,8 @@ private void markExplicitTransaction(QueryPlan queryPlan) throws LockException { } /** * Ensures that the current SQL statement is appropriate for the current state of the - * Transaction Manager (e.g. can call commit unless you called start transaction) + * Transaction Manager (e.g. can call commit unless you called start transaction). * - * Note that support for multi-statement txns is a work-in-progress so it's only supported in - * HiveConf#HIVE_IN_TEST/HiveConf#TEZ_HIVE_IN_TEST. * @param queryPlan * @throws LockException */ diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java index 1aec2ac86091..2d01ac6136bd 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java @@ -13034,10 +13034,6 @@ else if(ast.getChild(0).getType() == HiveParser.TOK_FALSE) { case HiveParser.TOK_START_TRANSACTION: case HiveParser.TOK_COMMIT: case HiveParser.TOK_ROLLBACK: - if(!(conf.getBoolVar(ConfVars.HIVE_IN_TEST) || conf.getBoolVar(ConfVars.HIVE_IN_TEZ_TEST))) { - throw new IllegalStateException(HiveOperation.operationForToken(ast.getToken().getType()) + - " is not supported yet."); - } queryState.setCommandType(HiveOperation.operationForToken(ast.getToken().getType())); return false; } diff --git a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java index fad6593a6360..e022e5c297dc 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java @@ -37,9 +37,12 @@ import org.apache.hadoop.hive.metastore.api.TxnType; import org.apache.hadoop.hive.metastore.api.CommitTxnRequest; import org.apache.hadoop.hive.metastore.conf.MetastoreConf; +import org.apache.hadoop.hive.metastore.metrics.Metrics; +import org.apache.hadoop.hive.metastore.metrics.MetricsConstants; import org.apache.hadoop.hive.metastore.txn.TxnHandler; import org.apache.hadoop.hive.metastore.txn.service.CompactionHouseKeeperService; import org.apache.hadoop.hive.metastore.txn.service.AcidHouseKeeperService; +import org.apache.hadoop.hive.metastore.txn.service.DeadlockDetectorService; import org.apache.hadoop.hive.ql.Context; import org.apache.hadoop.hive.ql.Driver; import org.apache.hadoop.hive.ql.QueryState; @@ -4747,4 +4750,429 @@ public void testMaterializedViewRebuildLockForSameMV() throws Exception { // cleanup driver.run("drop materialized view mv_tab_acid"); } + + /** + * N-party deadlock cycle: txn_i holds table_i, then requests table_{(i+1) % n}, forming + * a complete wait-for cycle. The detector must abort the youngest member (txn_{n-1}) and + * leave the rest live; the predecessor's waiting lock must become acquirable. Tarjan's + * SCC handles cycles of any size uniformly, so {n=2, 3} cover the canonical case and a + * non-trivial SCC respectively. + */ + @Test + public void testDeadlockDetectionMultiParty() throws Exception { + for (int n : new int[]{2, 3}) { + assertCycleAbortsYoungest(n); + } + } + + private void assertCycleAbortsYoungest(int n) throws Exception { + String[] tables = new String[n]; + for (int i = 0; i < n; i++) { + tables[i] = "DLM" + n + "_T" + i; + } + dropTable(tables); + conf.setBoolVar(HiveConf.ConfVars.TXN_WRITE_X_LOCK, true); + for (String t : tables) { + driver.run("create table " + t + " (a int, b int) clustered by(b) into 2 buckets " + + "stored as orc TBLPROPERTIES ('transactional'='true')"); + } + + HiveTxnManager[] mgrs = new HiveTxnManager[n]; + long[] ids = new long[n]; + for (int i = 0; i < n; i++) { + mgrs[i] = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); + swapTxnManager(mgrs[i]); + mgrs[i].openTxn(ctx, "Txn" + i); + ids[i] = mgrs[i].getCurrentTxnId(); + driver.compileAndRespond("update " + tables[i] + " set a = 1 where b = 1", true); + mgrs[i].acquireLocks(driver.getPlan(), ctx, "Txn" + i); + } + for (int i = 1; i < n; i++) { + Assert.assertTrue("Txn IDs must be monotonically increasing", ids[i] > ids[i - 1]); + } + + for (int i = 0; i < n; i++) { + swapTxnManager(mgrs[i]); + driver.compileAndRespond("update " + tables[(i + 1) % n] + " set a = 2 where b = 2", true); + ((DbTxnManager) mgrs[i]).acquireLocks(driver.getPlan(), ctx, "Txn" + i, false); + } + + runDeadlockDetector(); + + int victim = n - 1; + LockException ex = null; + try { + mgrs[victim].heartbeat(); + } catch (LockException e) { + ex = e; + } + Assert.assertNotNull(n + "-party: heartbeat on victim Txn" + victim + " should fail", ex); + Assert.assertTrue(n + "-party: cause should be TxnAbortedException, was: " + ex, + ex.getCause() instanceof org.apache.hadoop.hive.metastore.api.TxnAbortedException); + for (int i = 0; i < victim; i++) { + mgrs[i].heartbeat(); + } + + // Predecessor's waiting lock (on the victim's table) is now acquirable. + int predecessor = victim - 1; + swapTxnManager(mgrs[predecessor]); + List locks = getLocks(mgrs[predecessor]); + long waitingLockId = findWaitingLockId(locks, tables[victim], ids[predecessor]); + Assert.assertTrue(n + "-party: predecessor's waiting lock should exist", waitingLockId > 0); + LockState state = ((DbLockManager) mgrs[predecessor].getLockManager()).checkLock(waitingLockId); + Assert.assertEquals(n + "-party: predecessor's lock should now be acquired", + LockState.ACQUIRED, state); + + for (int i = 0; i < victim; i++) { + mgrs[i].rollbackTxn(); + } + for (HiveTxnManager m : mgrs) { + m.closeTxnManager(); + } + swapTxnManager(txnMgr); + } + + /** + * Linear wait chain (no cycle): A holds lock, B waits for A. + * The detector must NOT abort either transaction. + */ + @Test + public void testNoFalsePositiveLinearChain() throws Exception { + dropTable(new String[]{"NF_T1"}); + conf.setBoolVar(HiveConf.ConfVars.TXN_WRITE_X_LOCK, true); + driver.run("create table NF_T1 (a int, b int) " + + "clustered by(b) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')"); + + HiveTxnManager txnMgrA = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); + swapTxnManager(txnMgrA); + txnMgrA.openTxn(ctx, "TxnA"); + driver.compileAndRespond("update NF_T1 set a = 1 where b = 1", true); + txnMgrA.acquireLocks(driver.getPlan(), ctx, "TxnA"); + + HiveTxnManager txnMgrB = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); + swapTxnManager(txnMgrB); + txnMgrB.openTxn(ctx, "TxnB"); + driver.compileAndRespond("update NF_T1 set a = 2 where b = 2", true); + ((DbTxnManager) txnMgrB).acquireLocks(driver.getPlan(), ctx, "TxnB", false); + + List locks = getLocks(txnMgrB); + long txnBWaitingLockId = -1; + for (ShowLocksResponseElement lock : locks) { + if (lock.getState() == LockState.WAITING) { + txnBWaitingLockId = lock.getLockid(); + break; + } + } + Assert.assertTrue("Should have found B's waiting lock", txnBWaitingLockId > 0); + + long deadlockedBefore = getDeadlockedCounter(); + runDeadlockDetector(); + Assert.assertEquals("Linear chain must not increment deadlock counter", + deadlockedBefore, getDeadlockedCounter()); + + // Neither txn should have been aborted; B remains WAITING. + txnMgrA.heartbeat(); + txnMgrB.heartbeat(); + LockState state = ((DbLockManager) txnMgrB.getLockManager()).checkLock(txnBWaitingLockId); + Assert.assertEquals("B should still be WAITING (no deadlock)", LockState.WAITING, state); + + txnMgrA.rollbackTxn(); + txnMgrB.rollbackTxn(); + txnMgrA.closeTxnManager(); + txnMgrB.closeTxnManager(); + swapTxnManager(txnMgr); + } + + /** + * The headline soundness test: a young transaction {@code D} is blocked by a member of + * an established two-party cycle but is itself NOT in the cycle. The detector must + * abort the cycle's youngest member ({@code B}) — never {@code D}, even though + * {@code D} has the highest txn ID overall. This pins the fix for the original + * detector's seed-based victim-selection bug. + */ + @Test + public void testInnocentWaiterNotAborted() throws Exception { + dropTable(new String[]{"IW_T1", "IW_T2"}); + conf.setBoolVar(HiveConf.ConfVars.TXN_WRITE_X_LOCK, true); + driver.run("create table IW_T1 (a int, b int) " + + "clustered by(b) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')"); + driver.run("create table IW_T2 (a int, b int) " + + "clustered by(b) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')"); + + // A holds IW_T1 + HiveTxnManager txnMgrA = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); + swapTxnManager(txnMgrA); + txnMgrA.openTxn(ctx, "TxnA"); + long txnIdA = txnMgrA.getCurrentTxnId(); + driver.compileAndRespond("update IW_T1 set a = 1 where b = 1", true); + txnMgrA.acquireLocks(driver.getPlan(), ctx, "TxnA"); + + // B holds IW_T2 + HiveTxnManager txnMgrB = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); + swapTxnManager(txnMgrB); + txnMgrB.openTxn(ctx, "TxnB"); + long txnIdB = txnMgrB.getCurrentTxnId(); + driver.compileAndRespond("update IW_T2 set a = 1 where b = 1", true); + txnMgrB.acquireLocks(driver.getPlan(), ctx, "TxnB"); + + // A waits for IW_T2 (-> B); B waits for IW_T1 (-> A): cycle {A,B}. + swapTxnManager(txnMgrA); + driver.compileAndRespond("update IW_T2 set a = 2 where b = 2", true); + ((DbTxnManager) txnMgrA).acquireLocks(driver.getPlan(), ctx, "TxnA", false); + swapTxnManager(txnMgrB); + driver.compileAndRespond("update IW_T1 set a = 2 where b = 2", true); + ((DbTxnManager) txnMgrB).acquireLocks(driver.getPlan(), ctx, "TxnB", false); + + // D — newest txn, NOT in the cycle. D simply waits on IW_T1 (held by A). + HiveTxnManager txnMgrD = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); + swapTxnManager(txnMgrD); + txnMgrD.openTxn(ctx, "TxnD"); + long txnIdD = txnMgrD.getCurrentTxnId(); + Assert.assertTrue("D should have the highest txn ID", txnIdD > txnIdB && txnIdB > txnIdA); + driver.compileAndRespond("update IW_T1 set a = 3 where b = 3", true); + ((DbTxnManager) txnMgrD).acquireLocks(driver.getPlan(), ctx, "TxnD", false); + + runDeadlockDetector(); + + // B is the youngest cycle member -> aborted. A and D survive. + Assert.assertTrue("B (younger of the two cycle members) should be aborted", + heartbeatThrowsTxnAborted(txnMgrB)); + txnMgrA.heartbeat(); + txnMgrD.heartbeat(); + + txnMgrA.rollbackTxn(); + txnMgrD.rollbackTxn(); + txnMgrA.closeTxnManager(); + txnMgrB.closeTxnManager(); + txnMgrD.closeTxnManager(); + swapTxnManager(txnMgr); + } + + /** + * The detector kill switch ({@code TXN_DEADLOCK_DETECTOR_ENABLED=false}) must + * short-circuit {@code run()} and prevent any abort, even on a real cycle. This is the + * production "stop the bleeding" lever; a regression that ignores the flag would let a + * misbehaving detector keep killing user txns until binary rollback. + */ + @Test + public void testDetectorDisabledKillSwitch() throws Exception { + dropTable(new String[]{"KS_T1", "KS_T2"}); + conf.setBoolVar(HiveConf.ConfVars.TXN_WRITE_X_LOCK, true); + boolean originalEnabled = MetastoreConf.getBoolVar(conf, + MetastoreConf.ConfVars.TXN_DEADLOCK_DETECTOR_ENABLED); + MetastoreConf.setBoolVar(conf, MetastoreConf.ConfVars.TXN_DEADLOCK_DETECTOR_ENABLED, false); + HiveTxnManager txnMgrA = null; + HiveTxnManager txnMgrB = null; + try { + driver.run("create table KS_T1 (a int, b int) " + + "clustered by(b) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')"); + driver.run("create table KS_T2 (a int, b int) " + + "clustered by(b) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')"); + txnMgrA = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); + swapTxnManager(txnMgrA); + txnMgrA.openTxn(ctx, "TxnA"); + driver.compileAndRespond("update KS_T1 set a = 1 where b = 1", true); + txnMgrA.acquireLocks(driver.getPlan(), ctx, "TxnA"); + txnMgrB = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); + swapTxnManager(txnMgrB); + txnMgrB.openTxn(ctx, "TxnB"); + driver.compileAndRespond("update KS_T2 set a = 1 where b = 1", true); + txnMgrB.acquireLocks(driver.getPlan(), ctx, "TxnB"); + swapTxnManager(txnMgrA); + driver.compileAndRespond("update KS_T2 set a = 2 where b = 2", true); + ((DbTxnManager) txnMgrA).acquireLocks(driver.getPlan(), ctx, "TxnA", false); + swapTxnManager(txnMgrB); + driver.compileAndRespond("update KS_T1 set a = 2 where b = 2", true); + ((DbTxnManager) txnMgrB).acquireLocks(driver.getPlan(), ctx, "TxnB", false); + + runDeadlockDetector(); + + // Both txns must still be alive — kill switch must skip the scan entirely. + txnMgrA.heartbeat(); + txnMgrB.heartbeat(); + } finally { + MetastoreConf.setBoolVar(conf, MetastoreConf.ConfVars.TXN_DEADLOCK_DETECTOR_ENABLED, + originalEnabled); + if (txnMgrA != null) { + try { txnMgrA.rollbackTxn(); } catch (Exception ignored) { } + txnMgrA.closeTxnManager(); + } + if (txnMgrB != null) { + try { txnMgrB.rollbackTxn(); } catch (Exception ignored) { } + txnMgrB.closeTxnManager(); + } + swapTxnManager(txnMgr); + } + } + + /** + * Re-running the detector immediately after a successful abort must be a no-op: the + * cycle is already broken and re-detection must not fire phantom aborts or double-count + * the metric. Production runs the detector on a 5-second timer, so this re-entry path + * is exercised every interval. + */ + @Test + public void testIdempotentReRunReturnsZero() throws Exception { + dropTable(new String[]{"IR_T1", "IR_T2"}); + conf.setBoolVar(HiveConf.ConfVars.TXN_WRITE_X_LOCK, true); + driver.run("create table IR_T1 (a int, b int) " + + "clustered by(b) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')"); + driver.run("create table IR_T2 (a int, b int) " + + "clustered by(b) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')"); + + HiveTxnManager txnMgrA = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); + swapTxnManager(txnMgrA); + txnMgrA.openTxn(ctx, "TxnA"); + driver.compileAndRespond("update IR_T1 set a = 1 where b = 1", true); + txnMgrA.acquireLocks(driver.getPlan(), ctx, "TxnA"); + HiveTxnManager txnMgrB = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); + swapTxnManager(txnMgrB); + txnMgrB.openTxn(ctx, "TxnB"); + driver.compileAndRespond("update IR_T2 set a = 1 where b = 1", true); + txnMgrB.acquireLocks(driver.getPlan(), ctx, "TxnB"); + swapTxnManager(txnMgrA); + driver.compileAndRespond("update IR_T2 set a = 2 where b = 2", true); + ((DbTxnManager) txnMgrA).acquireLocks(driver.getPlan(), ctx, "TxnA", false); + swapTxnManager(txnMgrB); + driver.compileAndRespond("update IR_T1 set a = 2 where b = 2", true); + ((DbTxnManager) txnMgrB).acquireLocks(driver.getPlan(), ctx, "TxnB", false); + + runDeadlockDetector(); + Assert.assertTrue("First run must abort B", heartbeatThrowsTxnAborted(txnMgrB)); + + long deadlockedAfterFirst = getDeadlockedCounter(); + runDeadlockDetector(); + // Second run: cycle already broken, A still alive, metric must not move. + txnMgrA.heartbeat(); + Assert.assertEquals("Re-running the detector must not increment the deadlock counter", + deadlockedAfterFirst, getDeadlockedCounter()); + + txnMgrA.rollbackTxn(); + txnMgrA.closeTxnManager(); + txnMgrB.closeTxnManager(); + swapTxnManager(txnMgr); + } + + /** + * Exact-delta assertion on {@code TOTAL_NUM_DEADLOCKED_TXNS}. A regression that + * over-counts (e.g. incrementing inside a retry loop) or under-counts (e.g. forgetting + * the metric on the new abort path) would slip past the existing tests, all of which + * only assert "victim got aborted" and not "metric moved by exactly one." + */ + @Test + public void testDeadlockMetricIncrement() throws Exception { + dropTable(new String[]{"DM_T1", "DM_T2"}); + conf.setBoolVar(HiveConf.ConfVars.TXN_WRITE_X_LOCK, true); + boolean originalAcidExt = MetastoreConf.getBoolVar(conf, + MetastoreConf.ConfVars.METASTORE_ACIDMETRICS_EXT_ON); + boolean originalMetrics = MetastoreConf.getBoolVar(conf, + MetastoreConf.ConfVars.METRICS_ENABLED); + MetastoreConf.setBoolVar(conf, MetastoreConf.ConfVars.METASTORE_ACIDMETRICS_EXT_ON, true); + MetastoreConf.setBoolVar(conf, MetastoreConf.ConfVars.METRICS_ENABLED, true); + // Force a fresh Metrics registry. Without METRICS_ENABLED + a re-init, every + // getOrCreateCounter call returns the same shared `dummyCounter`, so increments to + // any counter (e.g. AbortTxnsFunction's TOTAL_NUM_ABORTED_TXNS) leak into our reading + // of TOTAL_NUM_DEADLOCKED_TXNS — making the delta reflect *all* metric work, not just + // ours. + Metrics.shutdown(); + Metrics.initialize(conf); + HiveTxnManager txnMgrA = null; + HiveTxnManager txnMgrB = null; + try { + driver.run("create table DM_T1 (a int, b int) " + + "clustered by(b) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')"); + driver.run("create table DM_T2 (a int, b int) " + + "clustered by(b) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')"); + txnMgrA = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); + swapTxnManager(txnMgrA); + txnMgrA.openTxn(ctx, "TxnA"); + driver.compileAndRespond("update DM_T1 set a = 1 where b = 1", true); + txnMgrA.acquireLocks(driver.getPlan(), ctx, "TxnA"); + txnMgrB = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); + swapTxnManager(txnMgrB); + txnMgrB.openTxn(ctx, "TxnB"); + driver.compileAndRespond("update DM_T2 set a = 1 where b = 1", true); + txnMgrB.acquireLocks(driver.getPlan(), ctx, "TxnB"); + swapTxnManager(txnMgrA); + driver.compileAndRespond("update DM_T2 set a = 2 where b = 2", true); + ((DbTxnManager) txnMgrA).acquireLocks(driver.getPlan(), ctx, "TxnA", false); + swapTxnManager(txnMgrB); + driver.compileAndRespond("update DM_T1 set a = 2 where b = 2", true); + ((DbTxnManager) txnMgrB).acquireLocks(driver.getPlan(), ctx, "TxnB", false); + + long before = getDeadlockedCounter(); + runDeadlockDetector(); + long after = getDeadlockedCounter(); + Assert.assertEquals("Detector must increment the deadlock counter exactly once", + before + 1, after); + } finally { + MetastoreConf.setBoolVar(conf, MetastoreConf.ConfVars.METASTORE_ACIDMETRICS_EXT_ON, + originalAcidExt); + MetastoreConf.setBoolVar(conf, MetastoreConf.ConfVars.METRICS_ENABLED, originalMetrics); + // Restore Metrics to its prior state so subsequent tests in this class are not + // observing a registry that this test re-initialized. + Metrics.shutdown(); + if (originalMetrics) { + Metrics.initialize(conf); + } + if (txnMgrA != null) { + try { txnMgrA.rollbackTxn(); } catch (Exception ignored) { } + txnMgrA.closeTxnManager(); + } + if (txnMgrB != null) { + try { txnMgrB.rollbackTxn(); } catch (Exception ignored) { } + txnMgrB.closeTxnManager(); + } + swapTxnManager(txnMgr); + } + } + + /** + * Synchronously runs the deadlock detector. In production this runs on a periodic timer + * inside the metastore leader; tests bypass the schedule and the cluster mutex (via + * {@code enforceMutex(false)}) to deterministically observe the effect of one scan. + */ + private void runDeadlockDetector() { + DeadlockDetectorService detector = new DeadlockDetectorService(); + detector.setConf(conf); + detector.enforceMutex(false); + detector.run(); + } + + /** + * Returns true iff a heartbeat() on the given txn manager fails because the txn was + * aborted. Preferred over commitTxn() because the assertion is non-destructive — the + * txn stays in its current state if it's still open. + */ + private static boolean heartbeatThrowsTxnAborted(HiveTxnManager txnMgr) { + try { + txnMgr.heartbeat(); + return false; + } catch (LockException e) { + return e.getCause() instanceof org.apache.hadoop.hive.metastore.api.TxnAbortedException; + } + } + + /** + * Reads the current value of the deadlock-aborted-txn metric. The counter is + * process-global; tests must use snapshot-and-delta, not absolute values. + */ + private static long getDeadlockedCounter() { + return Metrics.getOrCreateCounter(MetricsConstants.TOTAL_NUM_DEADLOCKED_TXNS).getCount(); + } + + /** + * Locates the {@code WAITING} lock on the given table for the given txn ID. Returns + * {@code -1} if none found. + */ + private static long findWaitingLockId(List locks, String table, + long txnId) { + for (ShowLocksResponseElement lock : locks) { + if (lock.getState() == LockState.WAITING && table.equalsIgnoreCase(lock.getTablename()) + && lock.getTxnid() == txnId) { + return lock.getLockid(); + } + } + return -1; + } } diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java index 10015f74837c..6454d74ec035 100644 --- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java +++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java @@ -106,6 +106,9 @@ public class MetastoreConf { static final String ACID_OPEN_TXNS_COUNTER_SERVICE_CLASS = "org.apache.hadoop.hive.metastore.txn.service.AcidOpenTxnsCounterService"; @VisibleForTesting + static final String DEADLOCK_DETECTOR_SERVICE_CLASS = + "org.apache.hadoop.hive.metastore.txn.service.DeadlockDetectorService"; + @VisibleForTesting static final String ICEBERG_TABLE_SNAPSHOT_EXPIRY_SERVICE_CLASS = "org.apache.iceberg.mr.hive.metastore.task.IcebergHouseKeeperService"; public static final String METASTORE_AUTHENTICATION_LDAP_USERMEMBERSHIPKEY_NAME = @@ -313,6 +316,17 @@ public enum ConfVars { ACID_TXN_CLEANER_INTERVAL("metastore.acid.txn.cleaner.interval", "hive.metastore.acid.txn.cleaner.interval", 10, TimeUnit.SECONDS, "Time interval describing how often aborted and committed txns are cleaned."), + TXN_DEADLOCK_DETECTOR_ENABLED("metastore.txn.deadlock.detector.enabled", + "hive.metastore.txn.deadlock.detector.enabled", true, + "Run a metastore background thread that scans HIVE_LOCKS for wait-for cycles among " + + "ACID transactions. The youngest eligible transaction in each cycle is aborted " + + "with ABORT_DEADLOCK; REPL_CREATED and SOFT_DELETE are protected. When false, " + + "cycles resolve only via lock/txn timeouts. Toggling requires a metastore " + + "restart; this setting is not exposed via setMetaConf."), + TXN_DEADLOCK_DETECTOR_INTERVAL("metastore.txn.deadlock.detector.interval", + "hive.metastore.txn.deadlock.detector.interval", 5, TimeUnit.SECONDS, + "Interval between deadlock-detector scans. Lower values shorten detection latency " + + "at the cost of extra metastore DB load."), ADDED_JARS("metastore.added.jars.path", "hive.added.jars.path", "", "This an internal parameter."), AGGREGATE_STATS_CACHE_CLEAN_UNTIL("metastore.aggregate.stats.cache.clean.until", @@ -1534,6 +1548,7 @@ public enum ConfVars { COMPACTION_HOUSEKEEPER_SERVICE_CLASS + "," + ACID_TXN_CLEANER_SERVICE_CLASS + "," + ACID_OPEN_TXNS_COUNTER_SERVICE_CLASS + "," + + DEADLOCK_DETECTOR_SERVICE_CLASS + "," + MATERIALZIATIONS_REBUILD_LOCK_CLEANER_TASK_CLASS + "," + PARTITION_MANAGEMENT_TASK_CLASS + "," + ICEBERG_TABLE_SNAPSHOT_EXPIRY_SERVICE_CLASS, diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/metrics/MetricsConstants.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/metrics/MetricsConstants.java index 46512ab3c4ba..8e75e00960f9 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/metrics/MetricsConstants.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/metrics/MetricsConstants.java @@ -82,6 +82,13 @@ public class MetricsConstants { public static final String TOTAL_NUM_ABORTED_TXNS = "total_num_aborted_transactions"; public static final String TOTAL_NUM_COMMITTED_TXNS = "total_num_committed_transactions"; public static final String TOTAL_NUM_TIMED_OUT_TXNS = "total_num_timed_out_transactions"; + public static final String TOTAL_NUM_DEADLOCKED_TXNS = "total_num_deadlocked_transactions"; + public static final String TOTAL_NUM_DEADLOCK_DETECTOR_GRAPH_TOO_LARGE = + "total_num_deadlock_detector_graph_too_large"; + public static final String TOTAL_NUM_DEADLOCK_DETECTOR_SCAN_FAILURES = + "total_num_deadlock_detector_scan_failures"; + public static final String DEADLOCK_DETECTOR_SCAN_DURATION = + "deadlock_detector_scan_duration"; public static final String OPEN_CONNECTIONS = "open_connections"; diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnErrorMsg.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnErrorMsg.java index 6d7181db667b..3b3872881bc1 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnErrorMsg.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnErrorMsg.java @@ -33,6 +33,7 @@ public enum TxnErrorMsg { ABORT_COMPACTION_TXN(50006, "compaction transaction abort"), ABORT_MSCK_TXN(50007, "msck transaction abort"), ABORT_MIGRATION_TXN(50008, "managed migration transaction abort"), + ABORT_DEADLOCK(50009, "deadlock detected"), // Replication related aborts - 51000 - 51099 ABORT_DEFAULT_REPL_TXN(51000, "Replication:" + diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java index a847e01aca4d..ea47caabdf6d 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java @@ -1041,6 +1041,12 @@ public void performTimeOuts() { new PerformTimeoutsFunction(timeout, replicationTxnTimeout, transactionalListeners).execute(jdbcResource); } + @Override + public int performDeadlockDetection() throws MetaException { + Integer aborted = new DeadlockDetectionFunction(transactionalListeners).execute(jdbcResource); + return aborted == null ? 0 : aborted; + } + @Override public void countOpenTxns() throws MetaException { int openTxns = jdbcResource.execute(new CountOpenTxnsHandler()); diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java index 138acedc8547..aa8ffdb7ac51 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java @@ -107,7 +107,7 @@ public interface TxnStore extends Configurable { enum MUTEX_KEY { Initiator, Cleaner, HouseKeeper, IcebergHouseKeeper, TxnCleaner, - CompactionScheduler, MaterializationRebuild + CompactionScheduler, MaterializationRebuild, DeadlockDetector } // Compactor states (Should really be enum) String INITIATED_RESPONSE = "initiated"; @@ -565,6 +565,17 @@ void onRename(String oldCatName, String oldDbName, String oldTabName, String old @RetrySemantics.Idempotent void performTimeOuts(); + /** + * Scan HIVE_LOCKS for wait-for cycles and abort the youngest eligible txn in each + * (REPL_CREATED/SOFT_DELETE protected). Driven by {@code DeadlockDetectorService}; tests + * may invoke synchronously. + * + * @return number of transactions aborted as deadlock victims + */ + @Transactional(POOL_TX) + @RetrySemantics.Idempotent + int performDeadlockDetection() throws MetaException; + /** * This will look through the completed_txn_components table and look for partitions or tables * that may be ready for compaction. Also, look through txns and txn_components tables for diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/functions/DeadlockDetectionFunction.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/functions/DeadlockDetectionFunction.java new file mode 100644 index 000000000000..239ba7f72012 --- /dev/null +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/functions/DeadlockDetectionFunction.java @@ -0,0 +1,338 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.metastore.txn.jdbc.functions; + +import org.apache.hadoop.hive.metastore.TransactionalMetaStoreEventListener; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.api.TxnType; +import org.apache.hadoop.hive.metastore.messaging.EventMessage; +import org.apache.hadoop.hive.metastore.metrics.Metrics; +import org.apache.hadoop.hive.metastore.metrics.MetricsConstants; +import org.apache.hadoop.hive.metastore.txn.TxnErrorMsg; +import org.apache.hadoop.hive.metastore.txn.entities.LockInfo; +import org.apache.hadoop.hive.metastore.txn.entities.TxnWriteDetails; +import org.apache.hadoop.hive.metastore.txn.jdbc.MultiDataSourceJdbcResource; +import org.apache.hadoop.hive.metastore.txn.jdbc.TransactionContext; +import org.apache.hadoop.hive.metastore.txn.jdbc.TransactionalFunction; +import org.apache.hadoop.hive.metastore.txn.jdbc.queries.GetWriteIdsMappingForTxnIdsHandler; +import org.apache.hadoop.hive.metastore.utils.JavaUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.jdbc.core.namedparam.MapSqlParameterSource; + +import java.sql.ResultSet; +import java.sql.Types; +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.Deque; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +import static org.apache.hadoop.hive.metastore.txn.TxnHandler.notifyCommitOrAbortEvent; + +/** + * Single-snapshot scan: loads the wait-for graph from {@code HIVE_LOCKS} in one query, runs + * Tarjan's SCC, and aborts the youngest eligible txn (wait-die) in each cycle with + * {@link TxnErrorMsg#ABORT_DEADLOCK}. See {@link #isProtectedFromAbort} for the protected + * set; if every cycle member is protected the cycle is left to timeout. + */ +public class DeadlockDetectionFunction implements TransactionalFunction { + + private static final Logger LOG = LoggerFactory.getLogger(DeadlockDetectionFunction.class); + + /** + * Hard cap on edges loaded per scan. Real cycles are 2-5 txns; a graph this large means + * a metastore-wide pile-up, not a single deadlock. The scan is skipped and + * {@link MetricsConstants#TOTAL_NUM_DEADLOCK_DETECTOR_GRAPH_TOO_LARGE} fires. + */ + private static final int MAX_GRAPH_SIZE = 10_000; + + /** + * Loads WAITER -> BLOCKER edges plus the waiter's {@link TxnType}. Read-side complement + * of {@code CheckLockFunction}: it writes both {@code HL_BLOCKEDBY_*} columns on the + * waiter row when it parks the lock, and NULLs them on acquire — so a NOT-NULL JOIN on + * the {@code (ext_id, int_id)} PK naturally drops acquired locks. The {@code TXNS} JOIN + * filters {@code HL_TXNID = 0} (autocommit reads, which can't form txn-level cycles) and + * captures the type in the same MVCC snapshot. No {@code TXN_STATE} filter: HIVE_LOCKS + * rows only exist for OPEN txns. {@code DISTINCT} dedups DB-side to avoid shipping the + * duplicate edges a multi-statement waiter generates. + */ + // No SELECT keyword: addLimitClause prepends one and applies the dialect-correct row cap. + private static final String LOAD_WAIT_EDGES_SQL_BODY = """ + DISTINCT "WAITER"."HL_TXNID" AS "WAITER_TXN", + "BLOCKER"."HL_TXNID" AS "BLOCKER_TXN", + "WTXN"."TXN_TYPE" AS "WAITER_TYPE" + FROM "HIVE_LOCKS" "WAITER" + INNER JOIN "HIVE_LOCKS" "BLOCKER" + ON "BLOCKER"."HL_LOCK_EXT_ID" = "WAITER"."HL_BLOCKEDBY_EXT_ID" + AND "BLOCKER"."HL_LOCK_INT_ID" = "WAITER"."HL_BLOCKEDBY_INT_ID" + INNER JOIN "TXNS" "WTXN" + ON "WAITER"."HL_TXNID" = "WTXN"."TXN_ID" + WHERE "WAITER"."HL_LOCK_STATE" = :waitingState + AND "WAITER"."HL_TXNID" <> "BLOCKER"."HL_TXNID" + """; + + private final List transactionalListeners; + + public DeadlockDetectionFunction( + List transactionalListeners) { + this.transactionalListeners = transactionalListeners; + } + + @Override + public Integer execute(MultiDataSourceJdbcResource jdbcResource) throws MetaException { + Map> graph = new HashMap<>(); + Map txnTypes = new HashMap<>(); + boolean truncated; + try { + truncated = loadGraph(jdbcResource, graph, txnTypes); + } catch (Exception e) { + LOG.warn("Deadlock detection scan failed: {}", e.getMessage(), e); + Metrics.getOrCreateCounter(MetricsConstants.TOTAL_NUM_DEADLOCK_DETECTOR_SCAN_FAILURES).inc(); + return 0; + } + if (graph.isEmpty()) { + return 0; + } + if (truncated) { + LOG.warn("Deadlock detector skipping scan: wait-for graph exceeds {} edges. " + + "A pile-up this large is unlikely to be a single deadlock and needs operator attention.", + MAX_GRAPH_SIZE); + Metrics.getOrCreateCounter(MetricsConstants.TOTAL_NUM_DEADLOCK_DETECTOR_GRAPH_TOO_LARGE).inc(); + return 0; + } + + List> sccs = tarjanSCCs(graph); + TransactionContext txContext = jdbcResource.getTransactionManager().getActiveTransaction(); + int totalAborted = 0; + for (List scc : sccs) { + // Tarjan emits a singleton SCC for every acyclic node; size < 2 means "not in a cycle". + if (scc.size() < 2) { + continue; + } + Long victim = pickVictim(scc, txnTypes); + if (victim == null) { + LOG.warn("Deadlock detected in cycle {} but no eligible victim exists " + + "(all members are protected txn types). Cycle will resolve via timeout.", + formatTxnList(scc)); + continue; + } + // Per-cycle savepoint isolates a failing abort/notify from earlier successful aborts + // in this scan. + Object savepoint; + try { + savepoint = txContext.createSavepoint(); + } catch (Exception e) { + // Must throw, not return: a bare return would commit earlier aborts while + // reporting 0, desynchronizing DB state from fired listeners. + Metrics.getOrCreateCounter(MetricsConstants.TOTAL_NUM_DEADLOCK_DETECTOR_SCAN_FAILURES).inc(); + throw new MetaException("Failed to create savepoint for deadlock cycle " + + formatTxnList(scc) + ": " + e.getMessage()); + } + try { + if (abortVictim(jdbcResource, victim, scc, txnTypes)) { + totalAborted++; + } + txContext.releaseSavepoint(savepoint); + } catch (Exception e) { + // Let rollbackToSavepoint propagate on failure: outer @Transactional must abort the + // whole scan rather than continue on a connection in undefined state. Matches the + // unguarded pattern in CommitTxnFunction / PerformTimeoutsFunction / HeartbeatTxnRange. + txContext.rollbackToSavepoint(savepoint); + LOG.warn("Deadlock victim {} abort or notify failed; rolled back this cycle: {}", + JavaUtils.txnIdToString(victim), e.getMessage(), e); + Metrics.getOrCreateCounter(MetricsConstants.TOTAL_NUM_DEADLOCK_DETECTOR_SCAN_FAILURES).inc(); + } + } + return totalAborted; + } + + /** + * Returns true iff the UPDATE actually flipped OPEN -> ABORTED. Throws on abort/notify + * failure; caller must hold a savepoint. + */ + private boolean abortVictim(MultiDataSourceJdbcResource jdbcResource, long victim, + List scc, Map txnTypes) throws MetaException { + LOG.info("Deadlock detected. Cycle: {}. Victim: {}", + formatTxnList(scc), JavaUtils.txnIdToString(victim)); + // checkHeartbeat=false: deadlock victims are healthy (heartbeater still pinging); + // the heartbeat-aware UPDATE in PerformTimeouts would match zero rows here. + int aborted = new AbortTxnsFunction(Collections.singletonList(victim), + false, false, false, TxnErrorMsg.ABORT_DEADLOCK).execute(jdbcResource); + if (aborted != 1) { + LOG.info("Deadlock victim {} was already in a non-OPEN state when abort was attempted.", + JavaUtils.txnIdToString(victim)); + return false; + } + // Order matters: notify before metric inc. The savepoint can roll back the UPDATE, + // but a Codahale counter inc() cannot be undone. + notifyAbort(jdbcResource, victim, txnTypes.get(victim)); + Metrics.getOrCreateCounter(MetricsConstants.TOTAL_NUM_DEADLOCKED_TXNS).inc(); + return true; + } + + /** + * Loads up to {@code maxGraphSize} edges. Returns true iff truncated. Only waiter-side + * types are captured — every cycle member has an outgoing edge, so blocker-only leaves + * never need a type. Unknown {@link TxnType} ints map to {@code null}, treated as + * protected by {@link #pickVictim}. + */ + private boolean loadGraph(MultiDataSourceJdbcResource jdbcResource, + Map> graph, + Map txnTypes) throws MetaException { + // Server-side cap: ask for max+1 rows so an extra row signals truncation. Stops the DB + // from materializing a full DISTINCT self-join under contention. + String sql = jdbcResource.getSqlGenerator() + .addLimitClause(MAX_GRAPH_SIZE + 1, LOAD_WAIT_EDGES_SQL_BODY); + final boolean[] truncated = {false}; + MapSqlParameterSource params = new MapSqlParameterSource() + .addValue("waitingState", String.valueOf(LockInfo.LOCK_WAITING), Types.CHAR); + jdbcResource.getJdbcTemplate().query(sql, params, (ResultSet rs) -> { + int loaded = 0; + while (rs.next()) { + if (loaded >= MAX_GRAPH_SIZE) { + truncated[0] = true; + return null; + } + long waiter = rs.getLong("WAITER_TXN"); + long blocker = rs.getLong("BLOCKER_TXN"); + graph.computeIfAbsent(waiter, k -> new HashSet<>()).add(blocker); + graph.computeIfAbsent(blocker, k -> new HashSet<>()); + txnTypes.put(waiter, TxnType.findByValue(rs.getInt("WAITER_TYPE"))); + loaded++; + } + return null; + }); + return truncated[0]; + } + + /** Iterative (not recursive) Tarjan's SCC: a long wait chain must not blow the JVM stack. */ + private static List> tarjanSCCs(Map> graph) { + Map index = new HashMap<>(); + Map lowlink = new HashMap<>(); + Set onStack = new HashSet<>(); + Deque sccStack = new ArrayDeque<>(); + List> result = new ArrayList<>(); + int[] counter = {0}; + Deque callStack = new ArrayDeque<>(); + + for (Long start : graph.keySet()) { + if (index.containsKey(start)) { + continue; + } + pushFrame(callStack, start, graph, index, lowlink, onStack, sccStack, counter); + while (!callStack.isEmpty()) { + DfsFrame frame = callStack.peek(); + boolean recursed = false; + while (frame.blockers().hasNext()) { + long blocker = frame.blockers().next(); + if (!index.containsKey(blocker)) { + pushFrame(callStack, blocker, graph, index, lowlink, onStack, sccStack, counter); + recursed = true; + break; + } else if (onStack.contains(blocker)) { + lowlink.put(frame.txnId(), Math.min(lowlink.get(frame.txnId()), index.get(blocker))); + } + } + if (recursed) { + continue; + } + if (lowlink.get(frame.txnId()).equals(index.get(frame.txnId()))) { + List scc = new ArrayList<>(); + long popped; + do { + popped = sccStack.pop(); + onStack.remove(popped); + scc.add(popped); + } while (popped != frame.txnId()); + result.add(scc); + } + callStack.pop(); + if (!callStack.isEmpty()) { + DfsFrame parent = callStack.peek(); + lowlink.put(parent.txnId(), + Math.min(lowlink.get(parent.txnId()), lowlink.get(frame.txnId()))); + } + } + } + return result; + } + + private static void pushFrame(Deque callStack, long txnId, Map> graph, + Map index, Map lowlink, + Set onStack, Deque sccStack, int[] counter) { + index.put(txnId, counter[0]); + lowlink.put(txnId, counter[0]); + counter[0]++; + sccStack.push(txnId); + onStack.add(txnId); + callStack.push(new DfsFrame(txnId, graph.getOrDefault(txnId, Collections.emptySet()).iterator())); + } + + /** Youngest eligible (highest txn ID) member, or null if every member is protected. */ + private static Long pickVictim(List scc, Map txnTypes) { + return scc.stream() + .filter(txn -> !isProtectedFromAbort(txnTypes.get(txn))) + .max(Comparator.naturalOrder()) + .orElse(null); + } + + /** + * Aborting REPL_CREATED would diverge source/destination state; SOFT_DELETE leaves the + * table half-deleted. {@code null} = type unrecognized by this build (newer client); + * treat as protected rather than guess. + */ + private static boolean isProtectedFromAbort(TxnType type) { + return type == null + || type == TxnType.REPL_CREATED + || type == TxnType.SOFT_DELETE; + } + + /** + * Listener exceptions must propagate: swallowing them would persist the abort locally + * while leaving replication unaware, diverging source/destination until timeout. The + * next scan re-detects the cycle. + */ + private void notifyAbort(MultiDataSourceJdbcResource jdbcResource, long victim, TxnType txnType) + throws MetaException { + if (transactionalListeners == null || transactionalListeners.isEmpty()) { + return; + } + List writeDetails = jdbcResource.execute( + new GetWriteIdsMappingForTxnIdsHandler(Collections.singleton(victim))); + notifyCommitOrAbortEvent(victim, EventMessage.EventType.ABORT_TXN, + txnType == null ? TxnType.DEFAULT : txnType, + jdbcResource.getConnection(), writeDetails, transactionalListeners); + } + + private static String formatTxnList(List txns) { + return txns.stream() + .map(JavaUtils::txnIdToString) + .collect(Collectors.joining(", ", "[", "]")); + } + + /** Heap-backed stack frame for the iterative Tarjan walker. */ + private record DfsFrame(long txnId, Iterator blockers) {} +} \ No newline at end of file diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/service/DeadlockDetectorService.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/service/DeadlockDetectorService.java new file mode 100644 index 000000000000..a622f7092ebc --- /dev/null +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/service/DeadlockDetectorService.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.metastore.txn.service; + +import com.codahale.metrics.Timer; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.metastore.MetastoreTaskThread; +import org.apache.hadoop.hive.metastore.conf.MetastoreConf; +import org.apache.hadoop.hive.metastore.metrics.Metrics; +import org.apache.hadoop.hive.metastore.metrics.MetricsConstants; +import org.apache.hadoop.hive.metastore.txn.NoMutex; +import org.apache.hadoop.hive.metastore.txn.TxnStore; +import org.apache.hadoop.hive.metastore.txn.TxnUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.TimeUnit; + +/** + * Metastore background thread that drives {@link TxnStore#performDeadlockDetection()} on + * the configured interval, gated by the {@code DeadlockDetector} mutex. + */ +public class DeadlockDetectorService implements MetastoreTaskThread { + + private static final Logger LOG = LoggerFactory.getLogger(DeadlockDetectorService.class); + + private Configuration conf; + private TxnStore txnHandler; + private boolean shouldUseMutex = true; + + @Override + public void setConf(Configuration configuration) { + conf = configuration; + txnHandler = TxnUtils.getTxnStore(conf); + } + + @Override + public Configuration getConf() { + return conf; + } + + @Override + public long runFrequency(TimeUnit unit) { + return MetastoreConf.getTimeVar(conf, MetastoreConf.ConfVars.TXN_DEADLOCK_DETECTOR_INTERVAL, unit); + } + + @Override + public void run() { + if (!MetastoreConf.getBoolVar(conf, MetastoreConf.ConfVars.TXN_DEADLOCK_DETECTOR_ENABLED)) { + return; + } + TxnStore.MutexAPI mutex = shouldUseMutex ? txnHandler.getMutexAPI() : new NoMutex(); + Timer scanTimer = Metrics.getOrCreateTimer(MetricsConstants.DEADLOCK_DETECTOR_SCAN_DURATION); + try (AutoCloseable ignored = mutex.acquireLock(TxnStore.MUTEX_KEY.DeadlockDetector.name())) { + // Time only the scan itself; mutex-wait must not pollute the duration histogram. + Timer.Context timerCtx = scanTimer != null ? scanTimer.time() : null; + try { + long start = System.currentTimeMillis(); + int aborted = txnHandler.performDeadlockDetection(); + long durationMs = System.currentTimeMillis() - start; + if (aborted > 0) { + LOG.info("Deadlock detector aborted {} txn(s) in {} ms.", aborted, durationMs); + } else { + LOG.debug("Deadlock detector found no cycles. Took {} ms.", durationMs); + } + } finally { + if (timerCtx != null) { + timerCtx.stop(); + } + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + LOG.error("Deadlock detector thread interrupted: {}", + Thread.currentThread().getName(), e); + } catch (Throwable e) { + // Catch Throwable: ScheduledThreadPoolExecutor cancels the periodic task on any + // uncaught Throwable, which would silently disable detection until restart. Bump + // the scan-failures counter here too so a broken detector is observable in metrics + // and not just logs. + LOG.error("Unexpected throwable in thread: {}, message: {}", + Thread.currentThread().getName(), String.valueOf(e), e); + Metrics.getOrCreateCounter(MetricsConstants.TOTAL_NUM_DEADLOCK_DETECTOR_SCAN_FAILURES).inc(); + } + } + + @Override + public void enforceMutex(boolean enableMutex) { + this.shouldUseMutex = enableMutex; + } +} \ No newline at end of file