Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -143,8 +143,8 @@ public final class DbTxnManager extends HiveTxnManagerImpl {
* rollback (assuming the statement is well implemented)).
*
* This is done so that all commands run in a transaction which simplifies implementation and
* allows a simple implementation of multi-statement txns which don't require a lock manager
* capable of deadlock detection. (todo: not fully implemented; elaborate on how this LM works)
* allows a simple implementation of multi-statement txns. Deadlocks among waiting ACID
* txns are resolved metastore-side by {@code DeadlockDetectorService}, not by this client.
*
* Also, critically important, ensuring that everything runs in a transaction assigns an order
* to all operations in the system - needed for replication/DR.
Expand Down Expand Up @@ -323,10 +323,8 @@ private void markExplicitTransaction(QueryPlan queryPlan) throws LockException {
}
/**
* Ensures that the current SQL statement is appropriate for the current state of the
* Transaction Manager (e.g. can call commit unless you called start transaction)
* Transaction Manager (e.g. can call commit unless you called start transaction).
*
* Note that support for multi-statement txns is a work-in-progress so it's only supported in
* HiveConf#HIVE_IN_TEST/HiveConf#TEZ_HIVE_IN_TEST.
* @param queryPlan
* @throws LockException
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13034,10 +13034,6 @@ else if(ast.getChild(0).getType() == HiveParser.TOK_FALSE) {
case HiveParser.TOK_START_TRANSACTION:
case HiveParser.TOK_COMMIT:
case HiveParser.TOK_ROLLBACK:
if(!(conf.getBoolVar(ConfVars.HIVE_IN_TEST) || conf.getBoolVar(ConfVars.HIVE_IN_TEZ_TEST))) {
throw new IllegalStateException(HiveOperation.operationForToken(ast.getToken().getType()) +
" is not supported yet.");
}
queryState.setCommandType(HiveOperation.operationForToken(ast.getToken().getType()));
return false;
}
Expand Down
428 changes: 428 additions & 0 deletions ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,9 @@ public class MetastoreConf {
static final String ACID_OPEN_TXNS_COUNTER_SERVICE_CLASS =
"org.apache.hadoop.hive.metastore.txn.service.AcidOpenTxnsCounterService";
@VisibleForTesting
static final String DEADLOCK_DETECTOR_SERVICE_CLASS =
"org.apache.hadoop.hive.metastore.txn.service.DeadlockDetectorService";
@VisibleForTesting
static final String ICEBERG_TABLE_SNAPSHOT_EXPIRY_SERVICE_CLASS =
"org.apache.iceberg.mr.hive.metastore.task.IcebergHouseKeeperService";
public static final String METASTORE_AUTHENTICATION_LDAP_USERMEMBERSHIPKEY_NAME =
Expand Down Expand Up @@ -313,6 +316,17 @@ public enum ConfVars {
ACID_TXN_CLEANER_INTERVAL("metastore.acid.txn.cleaner.interval",
"hive.metastore.acid.txn.cleaner.interval", 10, TimeUnit.SECONDS,
"Time interval describing how often aborted and committed txns are cleaned."),
TXN_DEADLOCK_DETECTOR_ENABLED("metastore.txn.deadlock.detector.enabled",
"hive.metastore.txn.deadlock.detector.enabled", true,
"Run a metastore background thread that scans HIVE_LOCKS for wait-for cycles among "
+ "ACID transactions. The youngest eligible transaction in each cycle is aborted "
+ "with ABORT_DEADLOCK; REPL_CREATED and SOFT_DELETE are protected. When false, "
+ "cycles resolve only via lock/txn timeouts. Toggling requires a metastore "
+ "restart; this setting is not exposed via setMetaConf."),
TXN_DEADLOCK_DETECTOR_INTERVAL("metastore.txn.deadlock.detector.interval",
"hive.metastore.txn.deadlock.detector.interval", 5, TimeUnit.SECONDS,
"Interval between deadlock-detector scans. Lower values shorten detection latency "
+ "at the cost of extra metastore DB load."),
ADDED_JARS("metastore.added.jars.path", "hive.added.jars.path", "",
"This an internal parameter."),
AGGREGATE_STATS_CACHE_CLEAN_UNTIL("metastore.aggregate.stats.cache.clean.until",
Expand Down Expand Up @@ -1534,6 +1548,7 @@ public enum ConfVars {
COMPACTION_HOUSEKEEPER_SERVICE_CLASS + "," +
ACID_TXN_CLEANER_SERVICE_CLASS + "," +
ACID_OPEN_TXNS_COUNTER_SERVICE_CLASS + "," +
DEADLOCK_DETECTOR_SERVICE_CLASS + "," +
MATERIALZIATIONS_REBUILD_LOCK_CLEANER_TASK_CLASS + "," +
PARTITION_MANAGEMENT_TASK_CLASS + "," +
ICEBERG_TABLE_SNAPSHOT_EXPIRY_SERVICE_CLASS,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,13 @@ public class MetricsConstants {
public static final String TOTAL_NUM_ABORTED_TXNS = "total_num_aborted_transactions";
public static final String TOTAL_NUM_COMMITTED_TXNS = "total_num_committed_transactions";
public static final String TOTAL_NUM_TIMED_OUT_TXNS = "total_num_timed_out_transactions";
public static final String TOTAL_NUM_DEADLOCKED_TXNS = "total_num_deadlocked_transactions";
public static final String TOTAL_NUM_DEADLOCK_DETECTOR_GRAPH_TOO_LARGE =
"total_num_deadlock_detector_graph_too_large";
public static final String TOTAL_NUM_DEADLOCK_DETECTOR_SCAN_FAILURES =
"total_num_deadlock_detector_scan_failures";
public static final String DEADLOCK_DETECTOR_SCAN_DURATION =
"deadlock_detector_scan_duration";

public static final String OPEN_CONNECTIONS = "open_connections";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ public enum TxnErrorMsg {
ABORT_COMPACTION_TXN(50006, "compaction transaction abort"),
ABORT_MSCK_TXN(50007, "msck transaction abort"),
ABORT_MIGRATION_TXN(50008, "managed migration transaction abort"),
ABORT_DEADLOCK(50009, "deadlock detected"),

// Replication related aborts - 51000 - 51099
ABORT_DEFAULT_REPL_TXN(51000, "Replication:" +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1041,6 +1041,12 @@ public void performTimeOuts() {
new PerformTimeoutsFunction(timeout, replicationTxnTimeout, transactionalListeners).execute(jdbcResource);
}

@Override
public int performDeadlockDetection() throws MetaException {
Integer aborted = new DeadlockDetectionFunction(transactionalListeners).execute(jdbcResource);
return aborted == null ? 0 : aborted;
}

@Override
public void countOpenTxns() throws MetaException {
int openTxns = jdbcResource.execute(new CountOpenTxnsHandler());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ public interface TxnStore extends Configurable {

enum MUTEX_KEY {
Initiator, Cleaner, HouseKeeper, IcebergHouseKeeper, TxnCleaner,
CompactionScheduler, MaterializationRebuild
CompactionScheduler, MaterializationRebuild, DeadlockDetector
}
// Compactor states (Should really be enum)
String INITIATED_RESPONSE = "initiated";
Expand Down Expand Up @@ -565,6 +565,17 @@ void onRename(String oldCatName, String oldDbName, String oldTabName, String old
@RetrySemantics.Idempotent
void performTimeOuts();

/**
* Scan HIVE_LOCKS for wait-for cycles and abort the youngest eligible txn in each
* (REPL_CREATED/SOFT_DELETE protected). Driven by {@code DeadlockDetectorService}; tests
* may invoke synchronously.
*
* @return number of transactions aborted as deadlock victims
*/
@Transactional(POOL_TX)
@RetrySemantics.Idempotent
int performDeadlockDetection() throws MetaException;

/**
* This will look through the completed_txn_components table and look for partitions or tables
* that may be ready for compaction. Also, look through txns and txn_components tables for
Expand Down
Loading
Loading