From 4cc5b9086396290641f67c09ec913229a654003f Mon Sep 17 00:00:00 2001 From: Andreas Chatzistergiou Date: Thu, 30 Apr 2026 15:22:09 +0000 Subject: [PATCH 01/18] Experiment v1 --- .../resources/error/error-conditions.json | 8 ++++ .../analysis/RelationResolution.scala | 3 ++ .../UnresolveRelationsInTransaction.scala | 18 ++++--- .../connector/catalog/CatalogManager.scala | 13 +++++ .../sql/connector/catalog/LookupCatalog.scala | 11 ++++- .../TransactionAwareCatalogManager.scala | 15 ++++++ .../sql/errors/QueryCompilationErrors.scala | 10 ++++ .../internal/BaseSessionStateBuilder.scala | 20 +++++++- .../AppendDataTransactionSuite.scala | 23 ++++++++- .../connector/MergeIntoDataFrameSuite.scala | 32 ++++++++++++- .../PathBasedTableTransactionSuite.scala | 48 +++++++++++++++---- 11 files changed, 180 insertions(+), 21 deletions(-) diff --git a/common/utils/src/main/resources/error/error-conditions.json b/common/utils/src/main/resources/error/error-conditions.json index fb0bb87172a8e..672772c467716 100644 --- a/common/utils/src/main/resources/error/error-conditions.json +++ b/common/utils/src/main/resources/error/error-conditions.json @@ -7234,6 +7234,14 @@ ], "sqlState" : "42601" }, + "TRANSACTION_MULTI_CATALOG_NOT_SUPPORTED" : { + "message" : [ + "All tables participating in a transactional operation must belong to the same catalog.", + "The write target belongs to catalog '', but the query also references", + "table(s) from catalog(s): ." + ], + "sqlState" : "0A000" + }, "TRANSFORM_WITH_STATE_SCHEMA_MUST_BE_NULLABLE" : { "message" : [ "If Avro encoding is enabled, all the fields in the schema for column family must be nullable", diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RelationResolution.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RelationResolution.scala index 8769d1c4e4ffa..af84267fa0ffd 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RelationResolution.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RelationResolution.scala @@ -295,8 +295,11 @@ class RelationResolution( // we don't share-cache views. val table: Option[Table] = tableOrView.filter { case t: MetadataTable if t.getTableInfo.isInstanceOf[ViewInfo] => false + case t: V1Table if t.catalogTable.tableType == CatalogTableType.VIEW => false case _ => true } + // Enforce single-catalog isolation for all table loads inside a transaction. + table.foreach(_ => catalogManager.validateCatalogForTableLoad(catalog)) val sharedRelationCacheMatch = for { t <- table diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnresolveRelationsInTransaction.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnresolveRelationsInTransaction.scala index 8ee64e32376fb..48e8fbe424af0 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnresolveRelationsInTransaction.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnresolveRelationsInTransaction.scala @@ -21,16 +21,17 @@ import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, TransactionalWr import org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.allowInvokingTransformsInAnalyzer import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogPlugin, LookupCatalog} +import org.apache.spark.sql.errors.QueryCompilationErrors import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation /** - * When a transaction is active, converts resolved [[DataSourceV2Relation]] nodes back to - * [[V2TableReference]] placeholders for all relations loaded by a catalog with the same - * name as the transaction catalog. - * - * This forces re-resolution of those relations against the transaction's catalog, which - * intercepts [[TableCatalog#loadTable]] calls to track which tables are read as part of - * the transaction. + * When a transaction is active, inspects all resolved [[DataSourceV2Relation]] nodes inside a + * [[TransactionalWrite]] subtree: + * - Relations from the transaction's catalog are converted back to [[V2TableReference]] + * placeholders, forcing re-resolution so that [[TableCatalog#loadTable]] is intercepted + * by the transaction catalog to track reads. + * - If any relation from a different catalog is detected we produce an error. We only support + * single-catalog transactions so that the transaction catalog can track all accessed tables. */ class UnresolveRelationsInTransaction(val catalogManager: CatalogManager) extends Rule[LogicalPlan] with LookupCatalog { @@ -57,6 +58,9 @@ class UnresolveRelationsInTransaction(val catalogManager: CatalogManager) plan.transform { case r: DataSourceV2Relation if isLoadedFromCatalog(r, catalog) => V2TableReference.createForTransaction(r) + case r: DataSourceV2Relation if r.catalog.isDefined && r.identifier.isDefined => + throw QueryCompilationErrors.transactionMultiCatalogNotSupportedError( + catalog.name(), r.catalog.get.name()) } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogManager.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogManager.scala index 3aad52dbd1d01..d7c7ae771f552 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogManager.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogManager.scala @@ -74,6 +74,19 @@ class CatalogManager( def withTransaction(transaction: Transaction): CatalogManager = new TransactionAwareCatalogManager(this, transaction) + /** + * Called after a table is loaded during relation resolution. Overridden by + * [[TransactionAwareCatalogManager]] to enforce single-catalog isolation per transaction. + */ + def validateCatalogForTableLoad(catalog: CatalogPlugin): Unit = {} + + /** + * Returns the catalog name that owns path-based tables for the given data source format name, + * or None if the format is unknown or does not implement SupportsCatalogOptions. + * Overridden in sql/core via [[BaseSessionStateBuilder]] to use the real DataSource API. + */ + def catalogForDataSource(formatName: String): Option[String] = None + def isCatalogRegistered(name: String): Boolean = { try { catalog(name) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/LookupCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/LookupCatalog.scala index 14c0663730324..a53d4ed117091 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/LookupCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/LookupCatalog.scala @@ -135,7 +135,16 @@ private[sql] trait LookupCatalog extends Logging { Some((catalog, ident)) } catch { case _: CatalogNotFoundException => - Some((currentCatalog, nameParts.asIdentifier)) + // For path-based data sources (e.g. `pathformat2.'/path/to/t'`): if the format + // implements SupportsCatalogOptions and returns a catalog name, route the identifier + // to that catalog. This ensures CREATE TABLE and DML both land in the same catalog, + // which is necessary for transactional routing via pathBased(). + val dataSourceCatalog = catalogManager.catalogForDataSource(nameParts.head).flatMap { + catName => + try Some(catalogManager.catalog(catName)) + catch { case _: CatalogNotFoundException => None } + } + Some((dataSourceCatalog.getOrElse(currentCatalog), nameParts.asIdentifier)) } } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/TransactionAwareCatalogManager.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/TransactionAwareCatalogManager.scala index 70079357b6dde..67d6e93c7f383 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/TransactionAwareCatalogManager.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/TransactionAwareCatalogManager.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.connector.catalog import org.apache.spark.SparkException import org.apache.spark.sql.catalyst.catalog.TempVariableManager import org.apache.spark.sql.connector.catalog.transactions.Transaction +import org.apache.spark.sql.errors.QueryCompilationErrors /** * A [[CatalogManager]] decorator that redirects catalog lookups to the transaction's catalog @@ -46,6 +47,20 @@ private[sql] class TransactionAwareCatalogManager( if (txn.catalog.name() == resolved.name()) txn.catalog else resolved } + /** + * Validates that a table loaded during relation resolution belongs to the transaction catalog. + * All table loads during a transaction must come from the same catalog to ensure isolation. + */ + override def validateCatalogForTableLoad(catalog: CatalogPlugin): Unit = { + if (catalog.name() != txn.catalog.name()) { + throw QueryCompilationErrors.transactionMultiCatalogNotSupportedError( + txn.catalog.name(), catalog.name()) + } + } + + override def catalogForDataSource(formatName: String): Option[String] = + delegate.catalogForDataSource(formatName) + override def currentCatalog: CatalogPlugin = { val c = delegate.currentCatalog if (txn.catalog.name() == c.name()) txn.catalog else c diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala index 9b899867a9e37..c5400f5c59e90 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala @@ -1636,6 +1636,16 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase with Compilat "identifier" -> toSQLId(identifier))) } + def transactionMultiCatalogNotSupportedError( + txnCatalog: String, + foreignCatalog: String): Throwable = { + new AnalysisException( + errorClass = "TRANSACTION_MULTI_CATALOG_NOT_SUPPORTED", + messageParameters = Map( + "txnCatalog" -> txnCatalog, + "foreignCatalogs" -> foreignCatalog)) + } + def namespaceAlreadyExistsError(namespace: Array[String]): Throwable = { new NamespaceAlreadyExistsException(namespace) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala index c82651595bc59..b4ad8cd5cec8d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala @@ -29,7 +29,11 @@ import org.apache.spark.sql.catalyst.parser.ParserInterface import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.classic.{SparkSession, Strategy, StreamingCheckpointManager, StreamingQueryManager, UDFRegistration} -import org.apache.spark.sql.connector.catalog.CatalogManager +import org.apache.spark.sql.connector.catalog.{CatalogManager, SupportsCatalogOptions} +import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Utils +import org.apache.spark.sql.util.CaseInsensitiveStringMap +import scala.jdk.CollectionConverters._ +import scala.util.control.NonFatal import org.apache.spark.sql.errors.QueryCompilationErrors import org.apache.spark.sql.execution.{ColumnarRule, CommandExecutionMode, QueryExecution, SparkOptimizer, SparkPlanner, SparkSqlParser} import org.apache.spark.sql.execution.adaptive.AdaptiveRulesHolder @@ -163,7 +167,19 @@ abstract class BaseSessionStateBuilder( protected lazy val v2SessionCatalog = new V2SessionCatalog(catalog) protected lazy val catalogManager = { - val cm = new CatalogManager(v2SessionCatalog, catalog) + val cm = new CatalogManager(v2SessionCatalog, catalog) { + override def catalogForDataSource(formatName: String): Option[String] = + try { + DataSource.lookupDataSourceV2(formatName, this.conf).flatMap { + case sco: SupportsCatalogOptions => + val options = DataSourceV2Utils.extractSessionConfigs(sco, this.conf) + Option(sco.extractCatalog(new CaseInsensitiveStringMap(options.asJava))) + case _ => None + } + } catch { + case NonFatal(_) => None + } + } parentState.foreach(ps => cm.copySessionPathFrom(ps.catalogManager)) cm } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/AppendDataTransactionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/AppendDataTransactionSuite.scala index aef9c65550fc4..372cde7443102 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/AppendDataTransactionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/AppendDataTransactionSuite.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.connector import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.Row -import org.apache.spark.sql.connector.catalog.{Aborted, Committed} +import org.apache.spark.sql.connector.catalog.{Aborted, Committed, InMemoryRowLevelOperationTableCatalog} import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.SQLConf.PartitionOverwriteMode @@ -474,6 +474,27 @@ class AppendDataTransactionSuite extends RowLevelOperationSuiteBase { Row(12, 888, "hr", false))) } + test("multi-catalog query in transaction is rejected") { + createAndInitTable("pk INT NOT NULL, salary INT, dep STRING", + """{ "pk": 1, "salary": 100, "dep": "hr" } + |""".stripMargin) + + withSQLConf("spark.sql.catalog.cat2" -> + classOf[InMemoryRowLevelOperationTableCatalog].getName) { + sql("CREATE TABLE cat2.ns1.source (pk INT NOT NULL, salary INT, dep STRING)") + sql("INSERT INTO cat2.ns1.source VALUES (10, 999, 'hr')") + + val e = intercept[AnalysisException] { + sql(s"INSERT INTO $tableNameAsString SELECT * FROM cat2.ns1.source") + } + + checkError(e, "TRANSACTION_MULTI_CATALOG_NOT_SUPPORTED", + parameters = Map("txnCatalog" -> "cat", "foreignCatalogs" -> "cat2")) + assert(catalog.lastTransaction.currentState === Aborted) + assert(catalog.lastTransaction.isClosed) + } + } + test("SQL INSERT WITH SCHEMA EVOLUTION analysis failure aborts transaction") { createAndInitTable("pk INT NOT NULL, salary INT, dep STRING", """{ "pk": 1, "salary": 100, "dep": "hr" } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoDataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoDataFrameSuite.scala index d2dfae13b511c..f0bd943c94cfb 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoDataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoDataFrameSuite.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.connector import org.apache.spark.sql.{sources, Column, Row} import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.classic.MergeIntoWriter -import org.apache.spark.sql.connector.catalog.{Aborted, Committed} +import org.apache.spark.sql.connector.catalog.{Aborted, Committed, InMemoryRowLevelOperationTableCatalog} import org.apache.spark.sql.connector.catalog.Identifier import org.apache.spark.sql.connector.catalog.TableInfo import org.apache.spark.sql.functions._ @@ -1184,6 +1184,36 @@ class MergeIntoDataFrameSuite extends RowLevelOperationSuiteBase { } } + test("merge with pre-analyzed source from a different catalog is rejected") { + createAndInitTable("pk INT NOT NULL, salary INT, dep STRING", + """{ "pk": 1, "salary": 100, "dep": "hr" } + |""".stripMargin) + + withSQLConf("spark.sql.catalog.cat2" -> + classOf[InMemoryRowLevelOperationTableCatalog].getName) { + sql("CREATE TABLE cat2.ns1.source (pk INT NOT NULL, salary INT, dep STRING)") + sql("INSERT INTO cat2.ns1.source VALUES (10, 999, 'hr')") + + val sourceDF = spark.table("cat2.ns1.source").as("source") + sourceDF.queryExecution.assertAnalyzed() + + val e = intercept[AnalysisException] { + sourceDF + .mergeInto(tableNameAsString, $"source.pk" === targetTableCol("pk")) + .whenMatched() + .updateAll() + .whenNotMatched() + .insertAll() + .merge() + } + + checkError(e, "TRANSACTION_MULTI_CATALOG_NOT_SUPPORTED", + parameters = Map("txnCatalog" -> "cat", "foreignCatalogs" -> "cat2")) + assert(catalog.lastTransaction.currentState === Aborted) + assert(catalog.lastTransaction.isClosed) + } + } + test("SPARK-54444: any schema changes after analysis are prohibited") { import org.apache.spark.sql.connector.catalog.Column val sourceTable = "cat.ns1.source_table" diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/PathBasedTableTransactionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/PathBasedTableTransactionSuite.scala index c81f53673af3a..dd85f2f74ec1a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/PathBasedTableTransactionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/PathBasedTableTransactionSuite.scala @@ -107,15 +107,6 @@ class PathBasedTableTransactionSuite extends RowLevelOperationSuiteBase { val txnCat = spark.sessionState.catalogManager.catalog("txncat") .asInstanceOf[InMemoryRowLevelOperationTableCatalog] - // Non-transactional catalog configured. - withSQLConf("spark.datasource.pathformat2.catalog" -> "nontxncat") { - createPathTable("pathformat2.`/path/to/t1`") - sql("INSERT INTO pathformat2.`/path/to/t1` VALUES (1, 'a')") - // The transaction was not routed to any of the transactional catalogs. - assert(catalog.lastTransaction == null) - assert(txnCat.lastTransaction == null) - } - // Transactional catalog configured: pathBased resolves txncat as a // TransactionalCatalogPlugin and opens the transaction there instead. withSQLConf("spark.datasource.pathformat2.catalog" -> "txncat") { @@ -124,6 +115,17 @@ class PathBasedTableTransactionSuite extends RowLevelOperationSuiteBase { assert(txnCat.lastTransaction.currentState === Committed) assert(txnCat.lastTransaction.isClosed) } + + txnCat.lastTransaction = null // reset to distinguish from block 1 + + // Non-transactional catalog configured. + withSQLConf("spark.datasource.pathformat2.catalog" -> "nontxncat") { + createPathTable("pathformat2.`/path/to/t1`") + sql("INSERT INTO pathformat2.`/path/to/t1` VALUES (1, 'a')") + // The transaction was not routed to any of the transactional catalogs. + assert(catalog.lastTransaction == null) + assert(txnCat.lastTransaction == null) + } } } @@ -205,6 +207,34 @@ class PathBasedTableTransactionSuite extends RowLevelOperationSuiteBase { assert(txn.currentState === Aborted) assert(txn.isClosed) } + + test("path-based write with same-catalog source succeeds") { + createPathTable(tablePathWithFormat) + // ns1.source is resolved via the current catalog (spark_catalog), same as the write target. + sql("CREATE TABLE ns1.source (id INT, data STRING)") + sql("INSERT INTO ns1.source VALUES (1, 'a')") + + val (txn, _) = executeTransaction { + sql(s"INSERT INTO $tablePathWithFormat SELECT * FROM ns1.source") + } + assert(txn.currentState === Committed) + assert(txn.isClosed) + checkAnswer(spark.table(tablePathWithFormat), Row(1, "a") :: Nil) + } + + test("path-based write with source from different catalog is rejected") { + createPathTable(tablePathWithFormat) + // cat is a different catalog from spark_catalog (the path-based catalog). + sql("CREATE TABLE cat.ns1.source (id INT, data STRING)") + + val e = intercept[AnalysisException] { + sql(s"INSERT INTO $tablePathWithFormat SELECT * FROM cat.ns1.source") + } + checkError(e, "TRANSACTION_MULTI_CATALOG_NOT_SUPPORTED", + parameters = Map("txnCatalog" -> "spark_catalog", "foreignCatalogs" -> "cat")) + assert(catalog.lastTransaction.currentState === Aborted) + assert(catalog.lastTransaction.isClosed) + } } /** From 4c2ee364c9a0608a447e3d187907e9338fc7b58b Mon Sep 17 00:00:00 2001 From: Andreas Chatzistergiou Date: Fri, 22 May 2026 14:03:20 +0000 Subject: [PATCH 02/18] fixes --- .../spark/sql/connector/catalog/LookupCatalog.scala | 7 ++++--- .../spark/sql/internal/BaseSessionStateBuilder.scala | 11 +++++------ 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/LookupCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/LookupCatalog.scala index a53d4ed117091..bad0a6e033ab4 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/LookupCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/LookupCatalog.scala @@ -139,11 +139,12 @@ private[sql] trait LookupCatalog extends Logging { // implements SupportsCatalogOptions and returns a catalog name, route the identifier // to that catalog. This ensures CREATE TABLE and DML both land in the same catalog, // which is necessary for transactional routing via pathBased(). - val dataSourceCatalog = catalogManager.catalogForDataSource(nameParts.head).flatMap { - catName => + val dataSourceCatalog = Option(catalogManager.catalogForDataSource(nameParts.head)) + .flatten + .flatMap { catName => try Some(catalogManager.catalog(catName)) catch { case _: CatalogNotFoundException => None } - } + } Some((dataSourceCatalog.getOrElse(currentCatalog), nameParts.asIdentifier)) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala index b4ad8cd5cec8d..e0c5339e034e8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala @@ -16,6 +16,9 @@ */ package org.apache.spark.sql.internal +import scala.jdk.CollectionConverters._ +import scala.util.control.NonFatal + import org.apache.spark.annotation.Unstable import org.apache.spark.sql.{DataSourceRegistration, ExperimentalMethods, SparkSessionExtensions, UDTFRegistration} import org.apache.spark.sql.artifact.ArtifactManager @@ -30,10 +33,6 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.classic.{SparkSession, Strategy, StreamingCheckpointManager, StreamingQueryManager, UDFRegistration} import org.apache.spark.sql.connector.catalog.{CatalogManager, SupportsCatalogOptions} -import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Utils -import org.apache.spark.sql.util.CaseInsensitiveStringMap -import scala.jdk.CollectionConverters._ -import scala.util.control.NonFatal import org.apache.spark.sql.errors.QueryCompilationErrors import org.apache.spark.sql.execution.{ColumnarRule, CommandExecutionMode, QueryExecution, SparkOptimizer, SparkPlanner, SparkSqlParser} import org.apache.spark.sql.execution.adaptive.AdaptiveRulesHolder @@ -41,12 +40,12 @@ import org.apache.spark.sql.execution.aggregate.{ResolveEncodersInScalaAgg, Scal import org.apache.spark.sql.execution.analysis.DetectAmbiguousSelfJoin import org.apache.spark.sql.execution.command.{CheckViewReferences, CommandCheck} import org.apache.spark.sql.execution.datasources._ -import org.apache.spark.sql.execution.datasources.v2.{TableCapabilityCheck, V2SessionCatalog} +import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Utils, TableCapabilityCheck, V2SessionCatalog} import org.apache.spark.sql.execution.externalUDF.{ClassicExternalUDFPlanner, ExternalUDFPlanner, UnifiedExternalUDFPlanner} import org.apache.spark.sql.execution.streaming.runtime.ResolveWriteToStream import org.apache.spark.sql.expressions.UserDefinedAggregateFunction -import org.apache.spark.sql.util.ExecutionListenerManager +import org.apache.spark.sql.util.{CaseInsensitiveStringMap, ExecutionListenerManager} /** * Builder class that coordinates construction of a new [[SessionState]]. From 70f5228e3313f961eb070ebf4c7835ca9ab5fb4b Mon Sep 17 00:00:00 2001 From: Andreas Chatzistergiou Date: Fri, 1 May 2026 09:05:15 +0000 Subject: [PATCH 03/18] Improvements --- .../UnresolveRelationsInTransaction.scala | 30 +++++++++++-------- .../connector/catalog/CatalogManager.scala | 11 ++++--- .../sql/connector/catalog/LookupCatalog.scala | 24 +++++++-------- .../TransactionAwareCatalogManager.scala | 8 ++--- .../internal/BaseSessionStateBuilder.scala | 29 +++++++++--------- .../PathBasedTableTransactionSuite.scala | 22 +++++++------- 6 files changed, 66 insertions(+), 58 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnresolveRelationsInTransaction.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnresolveRelationsInTransaction.scala index 48e8fbe424af0..a7070557f18d9 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnresolveRelationsInTransaction.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnresolveRelationsInTransaction.scala @@ -25,13 +25,17 @@ import org.apache.spark.sql.errors.QueryCompilationErrors import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation /** - * When a transaction is active, inspects all resolved [[DataSourceV2Relation]] nodes inside a - * [[TransactionalWrite]] subtree: - * - Relations from the transaction's catalog are converted back to [[V2TableReference]] - * placeholders, forcing re-resolution so that [[TableCatalog#loadTable]] is intercepted - * by the transaction catalog to track reads. - * - If any relation from a different catalog is detected we produce an error. We only support - * single-catalog transactions so that the transaction catalog can track all accessed tables. + * When a transaction is active, converts resolved [[DataSourceV2Relation]] nodes back to + * [[V2TableReference]] placeholders for all relations loaded by a catalog with the same + * name as the transaction catalog. + * + * This forces re-resolution of those relations against the transaction's catalog, which + * intercepts [[TableCatalog#loadTable]] calls to track which tables are read as part of + * the transaction. + * + * Note: if any resolved relation is loaded from a different catalog we throw an error. + * We only support single catalog transactions. This allows us to track all tables that + * participate in the transaction through a single point. */ class UnresolveRelationsInTransaction(val catalogManager: CatalogManager) extends Rule[LogicalPlan] with LookupCatalog { @@ -56,17 +60,19 @@ class UnresolveRelationsInTransaction(val catalogManager: CatalogManager) plan: LogicalPlan, catalog: CatalogPlugin): LogicalPlan = { plan.transform { - case r: DataSourceV2Relation if isLoadedFromCatalog(r, catalog) => + case r: DataSourceV2Relation if r.identifier.isDefined => + if (!isLoadedFromCatalog(r, catalog)) { + throw QueryCompilationErrors.transactionMultiCatalogNotSupportedError( + catalog.name(), r.catalog.get.name()) + } + V2TableReference.createForTransaction(r) - case r: DataSourceV2Relation if r.catalog.isDefined && r.identifier.isDefined => - throw QueryCompilationErrors.transactionMultiCatalogNotSupportedError( - catalog.name(), r.catalog.get.name()) } } private def isLoadedFromCatalog( relation: DataSourceV2Relation, catalog: CatalogPlugin): Boolean = { - relation.catalog.exists(_.name == catalog.name) && relation.identifier.isDefined + relation.catalog.exists(_.name == catalog.name) } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogManager.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogManager.scala index d7c7ae771f552..28a0ea0c2cb98 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogManager.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogManager.scala @@ -46,7 +46,9 @@ import org.apache.spark.sql.internal.SQLConf private[sql] class CatalogManager( val defaultSessionCatalog: CatalogPlugin, - val v1SessionCatalog: SessionCatalog) extends SQLConfHelper with Logging { + val v1SessionCatalog: SessionCatalog, + val dataSourceCatalogResolver: DataSourceCatalogResolver = + DataSourceCatalogResolver.NoOp) extends SQLConfHelper with Logging { import CatalogManager.SESSION_CATALOG_NAME import CatalogV2Util._ @@ -82,10 +84,11 @@ class CatalogManager( /** * Returns the catalog name that owns path-based tables for the given data source format name, - * or None if the format is unknown or does not implement SupportsCatalogOptions. - * Overridden in sql/core via [[BaseSessionStateBuilder]] to use the real DataSource API. + * or None if the format is unknown or does not implement [[SupportsCatalogOptions]]. Delegates + * to [[dataSourceCatalogResolver]], which is supplied by sql/core's session-state builder. */ - def catalogForDataSource(formatName: String): Option[String] = None + def catalogForDataSource(formatName: String): Option[String] = + dataSourceCatalogResolver.catalogFor(formatName) def isCatalogRegistered(name: String): Boolean = { try { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/LookupCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/LookupCatalog.scala index bad0a6e033ab4..7d3c59619bc0a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/LookupCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/LookupCatalog.scala @@ -121,9 +121,17 @@ private[sql] trait LookupCatalog extends Logging { // this custom catalog can't be accessed. Some((catalogManager.v2SessionCatalog, nameParts.asIdentifier)) } else { + // Path-based data sources (e.g. `pathformat2.'/path/to/t'`) whose format declares a + // catalog via SupportsCatalogOptions are routed to that catalog with the full nameParts + // as the identifier. + val (catalogName, ident) = + Option(catalogManager.catalogForDataSource(nameParts.head)).flatten match { + case Some(catName) => (catName, nameParts.asIdentifier) + case None => (nameParts.head, nameParts.tail.asIdentifier) + } + try { - val catalog = catalogManager.catalog(nameParts.head) - val ident = nameParts.tail.asIdentifier + val catalog = catalogManager.catalog(catalogName) if (CatalogV2Util.isSessionCatalog(catalog)) { // Reject only when namespace is empty (e.g. spark_catalog.t with no database). // Allow multi-part namespace for metadata tables (e.g. default.table.snapshots). @@ -135,17 +143,7 @@ private[sql] trait LookupCatalog extends Logging { Some((catalog, ident)) } catch { case _: CatalogNotFoundException => - // For path-based data sources (e.g. `pathformat2.'/path/to/t'`): if the format - // implements SupportsCatalogOptions and returns a catalog name, route the identifier - // to that catalog. This ensures CREATE TABLE and DML both land in the same catalog, - // which is necessary for transactional routing via pathBased(). - val dataSourceCatalog = Option(catalogManager.catalogForDataSource(nameParts.head)) - .flatten - .flatMap { catName => - try Some(catalogManager.catalog(catName)) - catch { case _: CatalogNotFoundException => None } - } - Some((dataSourceCatalog.getOrElse(currentCatalog), nameParts.asIdentifier)) + Some((currentCatalog, nameParts.asIdentifier)) } } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/TransactionAwareCatalogManager.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/TransactionAwareCatalogManager.scala index 67d6e93c7f383..10fea1ca1fb61 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/TransactionAwareCatalogManager.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/TransactionAwareCatalogManager.scala @@ -33,7 +33,10 @@ import org.apache.spark.sql.errors.QueryCompilationErrors private[sql] class TransactionAwareCatalogManager( delegate: CatalogManager, txn: Transaction) - extends CatalogManager(delegate.defaultSessionCatalog, delegate.v1SessionCatalog) { + extends CatalogManager( + delegate.defaultSessionCatalog, + delegate.v1SessionCatalog, + delegate.dataSourceCatalogResolver) { override val tempVariableManager: TempVariableManager = delegate.tempVariableManager @@ -58,9 +61,6 @@ private[sql] class TransactionAwareCatalogManager( } } - override def catalogForDataSource(formatName: String): Option[String] = - delegate.catalogForDataSource(formatName) - override def currentCatalog: CatalogPlugin = { val c = delegate.currentCatalog if (txn.catalog.name() == c.name()) txn.catalog else c diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala index e0c5339e034e8..4ad9decda06c8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala @@ -32,7 +32,7 @@ import org.apache.spark.sql.catalyst.parser.ParserInterface import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.classic.{SparkSession, Strategy, StreamingCheckpointManager, StreamingQueryManager, UDFRegistration} -import org.apache.spark.sql.connector.catalog.{CatalogManager, SupportsCatalogOptions} +import org.apache.spark.sql.connector.catalog.{CatalogManager, DataSourceCatalogResolver, SupportsCatalogOptions} import org.apache.spark.sql.errors.QueryCompilationErrors import org.apache.spark.sql.execution.{ColumnarRule, CommandExecutionMode, QueryExecution, SparkOptimizer, SparkPlanner, SparkSqlParser} import org.apache.spark.sql.execution.adaptive.AdaptiveRulesHolder @@ -165,20 +165,21 @@ abstract class BaseSessionStateBuilder( protected lazy val v2SessionCatalog = new V2SessionCatalog(catalog) - protected lazy val catalogManager = { - val cm = new CatalogManager(v2SessionCatalog, catalog) { - override def catalogForDataSource(formatName: String): Option[String] = - try { - DataSource.lookupDataSourceV2(formatName, this.conf).flatMap { - case sco: SupportsCatalogOptions => - val options = DataSourceV2Utils.extractSessionConfigs(sco, this.conf) - Option(sco.extractCatalog(new CaseInsensitiveStringMap(options.asJava))) - case _ => None - } - } catch { - case NonFatal(_) => None + protected lazy val dataSourceCatalogResolver: DataSourceCatalogResolver = + (formatName: String) => + try { + DataSource.lookupDataSourceV2(formatName, conf).flatMap { + case sco: SupportsCatalogOptions => + val options = DataSourceV2Utils.extractSessionConfigs(sco, conf) + Option(sco.extractCatalog(new CaseInsensitiveStringMap(options.asJava))) + case _ => None } - } + } catch { + case NonFatal(_) => None + } + + protected lazy val catalogManager = { + val cm = new CatalogManager(v2SessionCatalog, catalog, dataSourceCatalogResolver) parentState.foreach(ps => cm.copySessionPathFrom(ps.catalogManager)) cm } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/PathBasedTableTransactionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/PathBasedTableTransactionSuite.scala index dd85f2f74ec1a..db1eb880d4182 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/PathBasedTableTransactionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/PathBasedTableTransactionSuite.scala @@ -107,17 +107,6 @@ class PathBasedTableTransactionSuite extends RowLevelOperationSuiteBase { val txnCat = spark.sessionState.catalogManager.catalog("txncat") .asInstanceOf[InMemoryRowLevelOperationTableCatalog] - // Transactional catalog configured: pathBased resolves txncat as a - // TransactionalCatalogPlugin and opens the transaction there instead. - withSQLConf("spark.datasource.pathformat2.catalog" -> "txncat") { - createPathTable("pathformat2.`/path/to/t2`") - sql("INSERT INTO pathformat2.`/path/to/t2` VALUES (1, 'a')") - assert(txnCat.lastTransaction.currentState === Committed) - assert(txnCat.lastTransaction.isClosed) - } - - txnCat.lastTransaction = null // reset to distinguish from block 1 - // Non-transactional catalog configured. withSQLConf("spark.datasource.pathformat2.catalog" -> "nontxncat") { createPathTable("pathformat2.`/path/to/t1`") @@ -126,6 +115,17 @@ class PathBasedTableTransactionSuite extends RowLevelOperationSuiteBase { assert(catalog.lastTransaction == null) assert(txnCat.lastTransaction == null) } + + txnCat.lastTransaction = null // Reset to distinguish from block 1. + + // Transactional catalog configured: pathBased resolves txncat as a + // TransactionalCatalogPlugin and opens the transaction there instead. + withSQLConf("spark.datasource.pathformat2.catalog" -> "txncat") { + createPathTable("pathformat2.`/path/to/t2`") + sql("INSERT INTO pathformat2.`/path/to/t2` VALUES (1, 'a')") + assert(txnCat.lastTransaction.currentState === Committed) + assert(txnCat.lastTransaction.isClosed) + } } } From 1c6a8b8099f082074a6a2b0b45469ac545166be3 Mon Sep 17 00:00:00 2001 From: Andreas Chatzistergiou Date: Fri, 1 May 2026 09:05:41 +0000 Subject: [PATCH 04/18] DataSourceCatalogResolver --- .../catalog/DataSourceCatalogResolver.scala | 31 +++++++++++++++++++ 1 file changed, 31 insertions(+) create mode 100644 sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/DataSourceCatalogResolver.scala diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/DataSourceCatalogResolver.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/DataSourceCatalogResolver.scala new file mode 100644 index 0000000000000..e4cf2a204f49e --- /dev/null +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/DataSourceCatalogResolver.scala @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connector.catalog + +/** + * Maps a path-based data source format name (e.g. `parquet`, `delta`) to the catalog that owns + * its tables, when the format implements [[SupportsCatalogOptions]]. The real implementation + * lives in sql/core; catalyst sees only this interface. + */ +private[sql] trait DataSourceCatalogResolver { + def catalogFor(formatName: String): Option[String] +} + +private[sql] object DataSourceCatalogResolver { + val NoOp: DataSourceCatalogResolver = (_: String) => None +} From 6eccb38e3ffcd6470a90ff7fee3be24976fde9a9 Mon Sep 17 00:00:00 2001 From: Andreas Chatzistergiou Date: Fri, 1 May 2026 09:32:22 +0000 Subject: [PATCH 05/18] Remove path based from QE --- .../spark/sql/execution/QueryExecution.scala | 50 +++---------------- 1 file changed, 6 insertions(+), 44 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala index 65bc57de907b2..3ca3032be485b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala @@ -22,7 +22,6 @@ import java.util.UUID import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong} import javax.annotation.concurrent.GuardedBy -import scala.jdk.CollectionConverters._ import scala.util.control.NonFatal import org.apache.hadoop.fs.Path @@ -32,22 +31,21 @@ import org.apache.spark.internal.LogKeys.EXTENDED_EXPLAIN_GENERATOR import org.apache.spark.rdd.RDD import org.apache.spark.sql.{AnalysisException, ExtendedExplainGenerator, Row} import org.apache.spark.sql.catalyst.{InternalRow, QueryPlanningTracker} -import org.apache.spark.sql.catalyst.analysis.{Analyzer, EliminateSubqueryAliases, LazyExpression, NameParameterizedQuery, UnresolvedRelation, UnsupportedOperationChecker} +import org.apache.spark.sql.catalyst.analysis.{Analyzer, LazyExpression, NameParameterizedQuery, UnsupportedOperationChecker} import org.apache.spark.sql.catalyst.expressions.codegen.ByteCodeStats import org.apache.spark.sql.catalyst.plans.QueryPlan -import org.apache.spark.sql.catalyst.plans.logical.{AppendData, Command, CommandResult, CompoundBody, CreateTableAsSelect, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic, ReplaceTableAsSelect, ReturnAnswer, TransactionalWrite => TransactionalWritePlan, Union, UnresolvedWith, WithCTE} +import org.apache.spark.sql.catalyst.plans.logical.{AppendData, Command, CommandResult, CompoundBody, CreateTableAsSelect, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic, ReplaceTableAsSelect, ReturnAnswer, Union, UnresolvedWith, WithCTE} import org.apache.spark.sql.catalyst.rules.{PlanChangeLogger, Rule} import org.apache.spark.sql.catalyst.transactions.TransactionUtils import org.apache.spark.sql.catalyst.util.StringUtils.PlanStringConcat import org.apache.spark.sql.catalyst.util.truncatedString import org.apache.spark.sql.classic.SparkSession -import org.apache.spark.sql.connector.catalog.{CatalogV2Util, LookupCatalog, SupportsCatalogOptions, TableCatalog, TransactionalCatalogPlugin} +import org.apache.spark.sql.connector.catalog.LookupCatalog import org.apache.spark.sql.connector.catalog.transactions.Transaction import org.apache.spark.sql.execution.SQLExecution.EXECUTION_ROOT_ID_KEY import org.apache.spark.sql.execution.adaptive.{AdaptiveExecutionContext, InsertAdaptiveSparkPlan} import org.apache.spark.sql.execution.bucketing.{CoalesceBucketsInJoin, DisableUnnecessaryBucketedScan} -import org.apache.spark.sql.execution.datasources.DataSource -import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Utils, TransactionalExec, V2TableRefreshUtil} +import org.apache.spark.sql.execution.datasources.v2.{TransactionalExec, V2TableRefreshUtil} import org.apache.spark.sql.execution.dynamicpruning.PlanDynamicPruningFilters import org.apache.spark.sql.execution.exchange.EnsureRequirements import org.apache.spark.sql.execution.reuse.ReuseExchangeAndSubquery @@ -56,7 +54,6 @@ import org.apache.spark.sql.execution.streaming.runtime.{IncrementalExecution, W import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.scripting.SqlScriptingExecution import org.apache.spark.sql.streaming.OutputMode -import org.apache.spark.sql.util.CaseInsensitiveStringMap import org.apache.spark.util.{LazyTry, Utils, UUIDv7Generator} import org.apache.spark.util.ArrayImplicits._ @@ -119,16 +116,9 @@ class QueryExecution( analyzerOpt.flatMap(_.catalogManager.transaction).orElse { // Only begin a new transaction for outer QEs that lead to execution. if (mode != CommandExecutionMode.SKIP) { - def resolve(w: TransactionalWritePlan): Option[TransactionalCatalogPlugin] = - pathBased(w) match { - case Some(c: TransactionalCatalogPlugin) => Some(c) - case Some(_) => None - // If the path is not data source based, fallback to catalog based resolution. - case None => TransactionalWrite.unapply(w) - } val catalog = logical match { - case UnresolvedWith(w: TransactionalWritePlan, _, _) => resolve(w) - case w: TransactionalWritePlan => resolve(w) + case UnresolvedWith(TransactionalWrite(c), _, _) => Some(c) + case TransactionalWrite(c) => Some(c) case _ => None } catalog.map(TransactionUtils.beginTransaction) @@ -139,34 +129,6 @@ class QueryExecution( } private def transactionOpt: Option[Transaction] = lazyTransactionOpt.get - // For path-based tables (e.g. `format.`/path/to/table``) the first identifier part is a - // connector name. SupportsCatalogOptions on the connector tells us which catalog actually - // owns the table. Returns Some(catalog) if parts.head is a recognized SupportsCatalogOptions - // data source (caller decides whether the catalog is transactional), or None to fall through - // to the catalog-based extractor. - private def pathBased(write: TransactionalWritePlan): Option[TableCatalog] = - EliminateSubqueryAliases(write.table) match { - case UnresolvedRelation(parts, _, _) if parts.length > 1 => - try { - DataSource.lookupDataSourceV2(parts.head, sparkSession.sessionState.conf) - .collect { case sco: SupportsCatalogOptions => sco } - .map { sco => - val sessionConfigs = DataSourceV2Utils.extractSessionConfigs( - sco, sparkSession.sessionState.conf) - // Pass the entire identifier as option. The connector can decide how to parse it - // if needed. - val options = sessionConfigs + ("identifier" -> parts.mkString(".")) - CatalogV2Util.getTableProviderCatalog( - sco, catalogManager, new CaseInsensitiveStringMap(options.asJava)) - } - } catch { - // The head of the multipart identifier is not a registered data source. - // Fallback to catalog-based detection. - case _: ClassNotFoundException => None - } - case _ => None - } - // Per-query analyzer: uses a transaction-aware CatalogManager when a transaction is active, // so that all catalog lookups and rule applications during analysis see the correct state // without relying on thread-local context. Any nested QueryExecution that is created during From 8fbadda91ddf0d776f154927a2c3da4427299a98 Mon Sep 17 00:00:00 2001 From: Andreas Chatzistergiou Date: Fri, 1 May 2026 13:44:49 +0000 Subject: [PATCH 06/18] Improvements --- .../analysis/RelationResolution.scala | 1 - .../connector/catalog/CatalogManager.scala | 19 +++++++--- .../catalog/DataSourceCatalogResolver.scala | 31 ---------------- .../PathBasedTableTransactionSuite.scala | 36 +++++++++++++++++-- 4 files changed, 48 insertions(+), 39 deletions(-) delete mode 100644 sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/DataSourceCatalogResolver.scala diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RelationResolution.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RelationResolution.scala index af84267fa0ffd..6aff45f153e2b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RelationResolution.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RelationResolution.scala @@ -295,7 +295,6 @@ class RelationResolution( // we don't share-cache views. val table: Option[Table] = tableOrView.filter { case t: MetadataTable if t.getTableInfo.isInstanceOf[ViewInfo] => false - case t: V1Table if t.catalogTable.tableType == CatalogTableType.VIEW => false case _ => true } // Enforce single-catalog isolation for all table loads inside a transaction. diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogManager.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogManager.scala index 28a0ea0c2cb98..6da0da8dea3ba 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogManager.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogManager.scala @@ -32,6 +32,18 @@ import org.apache.spark.sql.connector.catalog.transactions.Transaction import org.apache.spark.sql.errors.QueryCompilationErrors import org.apache.spark.sql.internal.SQLConf +/** + * Maps a path-based data source format name (e.g. `parquet`, `delta`) to the catalog that owns + * its tables, when the format implements [[SupportsCatalogOptions]]. + */ +private[sql] trait DataSourceCatalogResolver { + def catalogFor(formatName: String): Option[String] +} + +private[sql] object DataSourceCatalogResolver { + val NoOp: DataSourceCatalogResolver = (_: String) => None +} + /** * A thread-safe manager for [[CatalogPlugin]]s. It tracks all the registered catalogs, and allow * the caller to look up a catalog by name. @@ -77,15 +89,14 @@ class CatalogManager( new TransactionAwareCatalogManager(this, transaction) /** - * Called after a table is loaded during relation resolution. Overridden by + * Called to validate catalog for loading a table. Overridden by * [[TransactionAwareCatalogManager]] to enforce single-catalog isolation per transaction. */ def validateCatalogForTableLoad(catalog: CatalogPlugin): Unit = {} /** - * Returns the catalog name that owns path-based tables for the given data source format name, - * or None if the format is unknown or does not implement [[SupportsCatalogOptions]]. Delegates - * to [[dataSourceCatalogResolver]], which is supplied by sql/core's session-state builder. + * Returns the catalog name that owns path-based tables for the given data source format name. + * Returns None if the format is unknown or does not implement [[SupportsCatalogOptions]]. */ def catalogForDataSource(formatName: String): Option[String] = dataSourceCatalogResolver.catalogFor(formatName) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/DataSourceCatalogResolver.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/DataSourceCatalogResolver.scala deleted file mode 100644 index e4cf2a204f49e..0000000000000 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/DataSourceCatalogResolver.scala +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.connector.catalog - -/** - * Maps a path-based data source format name (e.g. `parquet`, `delta`) to the catalog that owns - * its tables, when the format implements [[SupportsCatalogOptions]]. The real implementation - * lives in sql/core; catalyst sees only this interface. - */ -private[sql] trait DataSourceCatalogResolver { - def catalogFor(formatName: String): Option[String] -} - -private[sql] object DataSourceCatalogResolver { - val NoOp: DataSourceCatalogResolver = (_: String) => None -} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/PathBasedTableTransactionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/PathBasedTableTransactionSuite.scala index db1eb880d4182..03a89ea0d63cf 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/PathBasedTableTransactionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/PathBasedTableTransactionSuite.scala @@ -22,6 +22,7 @@ import org.apache.spark.sql.Row import org.apache.spark.sql.connector.catalog.{Aborted, Committed, Identifier, InMemoryRowLevelOperationTableCatalog, InMemoryTableCatalog, SessionConfigSupport, SharedTablesInMemoryRowLevelOperationTableCatalog, SupportsCatalogOptions} import org.apache.spark.sql.execution.streaming.runtime.{MemoryStream, StreamingQueryWrapper} import org.apache.spark.sql.internal.SQLConf.V2_SESSION_CATALOG_IMPLEMENTATION +import org.apache.spark.sql.sources import org.apache.spark.sql.sources.DataSourceRegister import org.apache.spark.sql.streaming.StreamingQuery import org.apache.spark.sql.util.CaseInsensitiveStringMap @@ -38,6 +39,7 @@ class PathBasedTableTransactionSuite extends RowLevelOperationSuiteBase { private val tablePath = "`/path/to/t`" private val tablePathWithFormat = "pathformat.`/path/to/t`" + private val tablePathWithFormat2 = "pathformat2.`/path/to/t`" override def beforeEach(): Unit = { super.beforeEach() @@ -212,13 +214,20 @@ class PathBasedTableTransactionSuite extends RowLevelOperationSuiteBase { createPathTable(tablePathWithFormat) // ns1.source is resolved via the current catalog (spark_catalog), same as the write target. sql("CREATE TABLE ns1.source (id INT, data STRING)") - sql("INSERT INTO ns1.source VALUES (1, 'a')") + sql("INSERT INTO ns1.source VALUES (1, 'a'), (2, 'b')") - val (txn, _) = executeTransaction { - sql(s"INSERT INTO $tablePathWithFormat SELECT * FROM ns1.source") + val (txn, txnTables) = executeTransaction { + sql(s"INSERT INTO $tablePathWithFormat SELECT * FROM ns1.source WHERE id = 1") } assert(txn.currentState === Committed) assert(txn.isClosed) + // Source scan with predicate was tracked via the transaction catalog. + val sourceTxnTable = txnTables("spark_catalog.ns1.source") + assert(sourceTxnTable.scanEvents.size === 1) + assert(sourceTxnTable.scanEvents.flatten.exists { + case sources.EqualTo("id", 1) => true + case _ => false + }) checkAnswer(spark.table(tablePathWithFormat), Row(1, "a") :: Nil) } @@ -235,6 +244,27 @@ class PathBasedTableTransactionSuite extends RowLevelOperationSuiteBase { assert(catalog.lastTransaction.currentState === Aborted) assert(catalog.lastTransaction.isClosed) } + + test("path-based write with source from session-config-routed catalog is rejected") { + withSQLConf( + "spark.sql.catalog.txncat" -> classOf[InMemoryRowLevelOperationTableCatalog].getName, + "spark.datasource.pathformat2.catalog" -> "txncat") { + // pathformat2 routes to txncat; create the source there. + createPathTable(tablePathWithFormat2) + sql(s"INSERT INTO $tablePathWithFormat2 VALUES (1, 'a')") + + // pathformat falls back to the session catalog (extractCatalog returns null). + createPathTable(tablePathWithFormat) + + val e = intercept[AnalysisException] { + sql(s"INSERT INTO $tablePathWithFormat SELECT * FROM $tablePathWithFormat2") + } + checkError(e, "TRANSACTION_MULTI_CATALOG_NOT_SUPPORTED", + parameters = Map("txnCatalog" -> "spark_catalog", "foreignCatalogs" -> "txncat")) + assert(catalog.lastTransaction.currentState === Aborted) + assert(catalog.lastTransaction.isClosed) + } + } } /** From 017a79b3120da65c0dd31d5ba793d103af7ecf79 Mon Sep 17 00:00:00 2001 From: Andreas Chatzistergiou Date: Thu, 7 May 2026 09:30:42 +0000 Subject: [PATCH 07/18] v1 view fix + cleaning --- .../apache/spark/sql/catalyst/analysis/RelationResolution.scala | 2 ++ .../org/apache/spark/sql/connector/catalog/CatalogManager.scala | 2 +- .../org/apache/spark/sql/connector/catalog/LookupCatalog.scala | 2 +- 3 files changed, 4 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RelationResolution.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RelationResolution.scala index 6aff45f153e2b..e702b829e220c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RelationResolution.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RelationResolution.scala @@ -24,6 +24,7 @@ import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.SQLConfHelper import org.apache.spark.sql.catalyst.catalog.{ CatalogTable, + CatalogTableType, TemporaryViewRelation, UnresolvedCatalogRelation } @@ -295,6 +296,7 @@ class RelationResolution( // we don't share-cache views. val table: Option[Table] = tableOrView.filter { case t: MetadataTable if t.getTableInfo.isInstanceOf[ViewInfo] => false + case t: V1Table if t.catalogTable.tableType == CatalogTableType.VIEW => false case _ => true } // Enforce single-catalog isolation for all table loads inside a transaction. diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogManager.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogManager.scala index 6da0da8dea3ba..5d132f7a7cf56 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogManager.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogManager.scala @@ -99,7 +99,7 @@ class CatalogManager( * Returns None if the format is unknown or does not implement [[SupportsCatalogOptions]]. */ def catalogForDataSource(formatName: String): Option[String] = - dataSourceCatalogResolver.catalogFor(formatName) + Option(dataSourceCatalogResolver.catalogFor(formatName)).flatten def isCatalogRegistered(name: String): Boolean = { try { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/LookupCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/LookupCatalog.scala index 7d3c59619bc0a..398f3aaec7b4a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/LookupCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/LookupCatalog.scala @@ -125,7 +125,7 @@ private[sql] trait LookupCatalog extends Logging { // catalog via SupportsCatalogOptions are routed to that catalog with the full nameParts // as the identifier. val (catalogName, ident) = - Option(catalogManager.catalogForDataSource(nameParts.head)).flatten match { + catalogManager.catalogForDataSource(nameParts.head) match { case Some(catName) => (catName, nameParts.asIdentifier) case None => (nameParts.head, nameParts.tail.asIdentifier) } From 8c263975c2124013e64d525701897ae2a7461503 Mon Sep 17 00:00:00 2001 From: Andreas Chatzistergiou Date: Thu, 7 May 2026 11:10:29 +0000 Subject: [PATCH 08/18] Move nullability guard back to LookupCatalog --- .../org/apache/spark/sql/connector/catalog/CatalogManager.scala | 2 +- .../org/apache/spark/sql/connector/catalog/LookupCatalog.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogManager.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogManager.scala index 5d132f7a7cf56..6da0da8dea3ba 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogManager.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogManager.scala @@ -99,7 +99,7 @@ class CatalogManager( * Returns None if the format is unknown or does not implement [[SupportsCatalogOptions]]. */ def catalogForDataSource(formatName: String): Option[String] = - Option(dataSourceCatalogResolver.catalogFor(formatName)).flatten + dataSourceCatalogResolver.catalogFor(formatName) def isCatalogRegistered(name: String): Boolean = { try { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/LookupCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/LookupCatalog.scala index 398f3aaec7b4a..7d3c59619bc0a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/LookupCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/LookupCatalog.scala @@ -125,7 +125,7 @@ private[sql] trait LookupCatalog extends Logging { // catalog via SupportsCatalogOptions are routed to that catalog with the full nameParts // as the identifier. val (catalogName, ident) = - catalogManager.catalogForDataSource(nameParts.head) match { + Option(catalogManager.catalogForDataSource(nameParts.head)).flatten match { case Some(catName) => (catName, nameParts.asIdentifier) case None => (nameParts.head, nameParts.tail.asIdentifier) } From 1a313691d57320a6743640bfd063e4722c549742 Mon Sep 17 00:00:00 2001 From: Andreas Chatzistergiou Date: Thu, 7 May 2026 14:01:00 +0000 Subject: [PATCH 09/18] Improvements --- .../spark/sql/internal/BaseSessionStateBuilder.scala | 5 +++-- .../connector/PathBasedTableTransactionSuite.scala | 11 ++++------- 2 files changed, 7 insertions(+), 9 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala index 4ad9decda06c8..c6b57dbb65d56 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala @@ -17,7 +17,6 @@ package org.apache.spark.sql.internal import scala.jdk.CollectionConverters._ -import scala.util.control.NonFatal import org.apache.spark.annotation.Unstable import org.apache.spark.sql.{DataSourceRegistration, ExperimentalMethods, SparkSessionExtensions, UDTFRegistration} @@ -175,7 +174,9 @@ abstract class BaseSessionStateBuilder( case _ => None } } catch { - case NonFatal(_) => None + // The format name is not a registered data source. Fall through and let the caller + // treat it as a catalog/namespace name. + case _: ClassNotFoundException => None } protected lazy val catalogManager = { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/PathBasedTableTransactionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/PathBasedTableTransactionSuite.scala index 03a89ea0d63cf..ac7d3457e55ad 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/PathBasedTableTransactionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/PathBasedTableTransactionSuite.scala @@ -253,7 +253,7 @@ class PathBasedTableTransactionSuite extends RowLevelOperationSuiteBase { createPathTable(tablePathWithFormat2) sql(s"INSERT INTO $tablePathWithFormat2 VALUES (1, 'a')") - // pathformat falls back to the session catalog (extractCatalog returns null). + // pathformat routes to the session catalog (default extractCatalog). createPathTable(tablePathWithFormat) val e = intercept[AnalysisException] { @@ -269,9 +269,9 @@ class PathBasedTableTransactionSuite extends RowLevelOperationSuiteBase { /** * Simulates a path-based connector (e.g. Delta) that implements [[SupportsCatalogOptions]] - * to route `pathformat.\`/path/to/t\`` SQL identifiers to the session catalog. Returning - * null from [[extractCatalog]] signals that the session catalog (`spark_catalog`) owns the - * table, matching Delta's behavior where DeltaCatalog is registered as spark_catalog. + * to route `pathformat.\`/path/to/t\`` SQL identifiers to the session catalog. We rely on + * the default [[SupportsCatalogOptions#extractCatalog]] which returns + * [[org.apache.spark.sql.connector.catalog.CatalogManager.SESSION_CATALOG_NAME]]. */ class FakePathBasedSource extends FakeV2ProviderWithCustomSchema @@ -280,9 +280,6 @@ class FakePathBasedSource override def shortName(): String = "pathformat" - // Use the session catalog. - override def extractCatalog(options: CaseInsensitiveStringMap): String = null - // Not used in the transactional path. override def extractIdentifier(options: CaseInsensitiveStringMap): Identifier = null } From 6c3a8850775d8dea7eb137fdee61b665d68eb5a2 Mon Sep 17 00:00:00 2001 From: Andreas Chatzistergiou Date: Thu, 7 May 2026 14:26:10 +0000 Subject: [PATCH 10/18] Delegate identifier parsing to the connector for path based tables --- .../sql/connector/catalog/CatalogManager.scala | 18 ++++++++++-------- .../sql/connector/catalog/LookupCatalog.scala | 8 ++++---- .../sql/internal/BaseSessionStateBuilder.scala | 13 +++++++++---- .../PathBasedTableTransactionSuite.scala | 12 ++++++++---- 4 files changed, 31 insertions(+), 20 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogManager.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogManager.scala index 6da0da8dea3ba..c0a8438145448 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogManager.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogManager.scala @@ -33,15 +33,16 @@ import org.apache.spark.sql.errors.QueryCompilationErrors import org.apache.spark.sql.internal.SQLConf /** - * Maps a path-based data source format name (e.g. `parquet`, `delta`) to the catalog that owns - * its tables, when the format implements [[SupportsCatalogOptions]]. + * Resolves a multipart SQL identifier whose head names a path-based data source format + * (e.g. `pathformat.\`/path/to/t\``) to the owning catalog and a connector-canonicalized + * identifier, when the format implements [[SupportsCatalogOptions]]. */ private[sql] trait DataSourceCatalogResolver { - def catalogFor(formatName: String): Option[String] + def resolve(nameParts: Seq[String]): Option[(String, Identifier)] } private[sql] object DataSourceCatalogResolver { - val NoOp: DataSourceCatalogResolver = (_: String) => None + val NoOp: DataSourceCatalogResolver = (_: Seq[String]) => None } /** @@ -95,11 +96,12 @@ class CatalogManager( def validateCatalogForTableLoad(catalog: CatalogPlugin): Unit = {} /** - * Returns the catalog name that owns path-based tables for the given data source format name. - * Returns None if the format is unknown or does not implement [[SupportsCatalogOptions]]. + * Returns the catalog name and connector-canonicalized identifier for a multipart SQL name + * whose head is a [[SupportsCatalogOptions]] data source format. Returns None if the format + * head is unknown or does not implement [[SupportsCatalogOptions]]. */ - def catalogForDataSource(formatName: String): Option[String] = - dataSourceCatalogResolver.catalogFor(formatName) + def catalogAndIdentForDataSource(nameParts: Seq[String]): Option[(String, Identifier)] = + dataSourceCatalogResolver.resolve(nameParts) def isCatalogRegistered(name: String): Boolean = { try { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/LookupCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/LookupCatalog.scala index 7d3c59619bc0a..aae5557bab326 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/LookupCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/LookupCatalog.scala @@ -122,11 +122,11 @@ private[sql] trait LookupCatalog extends Logging { Some((catalogManager.v2SessionCatalog, nameParts.asIdentifier)) } else { // Path-based data sources (e.g. `pathformat2.'/path/to/t'`) whose format declares a - // catalog via SupportsCatalogOptions are routed to that catalog with the full nameParts - // as the identifier. + // catalog via SupportsCatalogOptions are routed to that catalog. Both the catalog and + // the canonical identifier come from the connector. val (catalogName, ident) = - Option(catalogManager.catalogForDataSource(nameParts.head)).flatten match { - case Some(catName) => (catName, nameParts.asIdentifier) + Option(catalogManager.catalogAndIdentForDataSource(nameParts)).flatten match { + case Some((catName, providerIdent)) => (catName, providerIdent) case None => (nameParts.head, nameParts.tail.asIdentifier) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala index c6b57dbb65d56..e06ec1a0e2ef9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala @@ -165,12 +165,17 @@ abstract class BaseSessionStateBuilder( protected lazy val v2SessionCatalog = new V2SessionCatalog(catalog) protected lazy val dataSourceCatalogResolver: DataSourceCatalogResolver = - (formatName: String) => + (nameParts: Seq[String]) => try { - DataSource.lookupDataSourceV2(formatName, conf).flatMap { + DataSource.lookupDataSourceV2(nameParts.head, conf).flatMap { case sco: SupportsCatalogOptions => - val options = DataSourceV2Utils.extractSessionConfigs(sco, conf) - Option(sco.extractCatalog(new CaseInsensitiveStringMap(options.asJava))) + // Pass the full multipart name as the `path` option so the connector can do its + // own canonicalization (strip its format prefix, normalize, etc.). Session configs + // are passed through as well. + val sessionOpts = DataSourceV2Utils.extractSessionConfigs(sco, conf) + val opts = new CaseInsensitiveStringMap( + (sessionOpts + ("path" -> nameParts.mkString("."))).asJava) + Some((sco.extractCatalog(opts), sco.extractIdentifier(opts))) case _ => None } } catch { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/PathBasedTableTransactionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/PathBasedTableTransactionSuite.scala index ac7d3457e55ad..ee6131fef0d40 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/PathBasedTableTransactionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/PathBasedTableTransactionSuite.scala @@ -280,8 +280,10 @@ class FakePathBasedSource override def shortName(): String = "pathformat" - // Not used in the transactional path. - override def extractIdentifier(options: CaseInsensitiveStringMap): Identifier = null + // Strip our own format prefix from the multipart path and return the rest under our + // format-name namespace. + override def extractIdentifier(options: CaseInsensitiveStringMap): Identifier = + Identifier.of(Array(shortName()), options.get("path").stripPrefix(s"${shortName()}.")) } /** @@ -301,6 +303,8 @@ class FakePathBasedSourceWithSessionConfig override def extractCatalog(options: CaseInsensitiveStringMap): String = options.get("catalog") - // Not used in the transactional path. - override def extractIdentifier(options: CaseInsensitiveStringMap): Identifier = null + // Strip our own format prefix from the multipart path and return the rest under our + // format-name namespace. + override def extractIdentifier(options: CaseInsensitiveStringMap): Identifier = + Identifier.of(Array(shortName()), options.get("path").stripPrefix(s"${shortName()}.")) } From f29732e5005d645e21d49ebb5ce7e56675ae9cb5 Mon Sep 17 00:00:00 2001 From: Andreas Chatzistergiou Date: Thu, 21 May 2026 12:02:08 +0000 Subject: [PATCH 11/18] Move Fake path based providers to FakeV@Provider --- .../spark/sql/connector/FakeV2Provider.scala | 45 +++++++++++++++++- .../PathBasedTableTransactionSuite.scala | 46 +------------------ 2 files changed, 45 insertions(+), 46 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/FakeV2Provider.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/FakeV2Provider.scala index d9faa7d7473dc..97a9fd06c9f17 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/FakeV2Provider.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/FakeV2Provider.scala @@ -19,9 +19,10 @@ package org.apache.spark.sql.connector import java.util -import org.apache.spark.sql.connector.catalog.{SupportsV1OverwriteWithSaveAsTable, Table, TableProvider} +import org.apache.spark.sql.connector.catalog.{Identifier, SessionConfigSupport, SupportsCatalogOptions, SupportsV1OverwriteWithSaveAsTable, Table, TableProvider} import org.apache.spark.sql.connector.expressions.Transform import org.apache.spark.sql.connector.read.{InputPartition, ScanBuilder} +import org.apache.spark.sql.sources.DataSourceRegister import org.apache.spark.sql.types.StructType import org.apache.spark.sql.util.CaseInsensitiveStringMap @@ -93,3 +94,45 @@ class FakeV2ProviderWithV1SaveAsTableOverwriteWriteOptionDisabled with SupportsV1OverwriteWithSaveAsTable { override def addV1OverwriteWithSaveAsTableOption(): Boolean = false } + +/** + * Simulates a path-based connector (e.g. Delta) that implements [[SupportsCatalogOptions]] + * to route `pathformat.\`/path/to/t\`` SQL identifiers to the session catalog. We rely on + * the default [[SupportsCatalogOptions#extractCatalog]] which returns + * [[org.apache.spark.sql.connector.catalog.CatalogManager.SESSION_CATALOG_NAME]]. + */ +class FakePathBasedSource + extends FakeV2ProviderWithCustomSchema + with SupportsCatalogOptions + with DataSourceRegister { + + override def shortName(): String = "pathformat" + + // Strip our own format prefix from the multipart path and return the rest under our + // format-name namespace. + override def extractIdentifier(options: CaseInsensitiveStringMap): Identifier = + Identifier.of(Array(shortName()), options.get("path").stripPrefix(s"${shortName()}.")) +} + +/** + * Like [[FakePathBasedSource]] but resolves the owning catalog from the session config + * `spark.datasource.pathformat2.catalog` instead of always returning null. This simulates + * a connector that lets users configure the target catalog. + */ +class FakePathBasedSourceWithSessionConfig + extends FakeV2ProviderWithCustomSchema + with SupportsCatalogOptions + with SessionConfigSupport + with DataSourceRegister { + + override def shortName(): String = "pathformat2" + + override def keyPrefix: String = "pathformat2" + + override def extractCatalog(options: CaseInsensitiveStringMap): String = options.get("catalog") + + // Strip our own format prefix from the multipart path and return the rest under our + // format-name namespace. + override def extractIdentifier(options: CaseInsensitiveStringMap): Identifier = + Identifier.of(Array(shortName()), options.get("path").stripPrefix(s"${shortName()}.")) +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/PathBasedTableTransactionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/PathBasedTableTransactionSuite.scala index ee6131fef0d40..bb88a781e4815 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/PathBasedTableTransactionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/PathBasedTableTransactionSuite.scala @@ -19,13 +19,11 @@ package org.apache.spark.sql.connector import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.Row -import org.apache.spark.sql.connector.catalog.{Aborted, Committed, Identifier, InMemoryRowLevelOperationTableCatalog, InMemoryTableCatalog, SessionConfigSupport, SharedTablesInMemoryRowLevelOperationTableCatalog, SupportsCatalogOptions} +import org.apache.spark.sql.connector.catalog.{Aborted, Committed, InMemoryRowLevelOperationTableCatalog, InMemoryTableCatalog, SharedTablesInMemoryRowLevelOperationTableCatalog} import org.apache.spark.sql.execution.streaming.runtime.{MemoryStream, StreamingQueryWrapper} import org.apache.spark.sql.internal.SQLConf.V2_SESSION_CATALOG_IMPLEMENTATION import org.apache.spark.sql.sources -import org.apache.spark.sql.sources.DataSourceRegister import org.apache.spark.sql.streaming.StreamingQuery -import org.apache.spark.sql.util.CaseInsensitiveStringMap /** * Tests for transactional writes to path-based tables, where the table is identified by a @@ -266,45 +264,3 @@ class PathBasedTableTransactionSuite extends RowLevelOperationSuiteBase { } } } - -/** - * Simulates a path-based connector (e.g. Delta) that implements [[SupportsCatalogOptions]] - * to route `pathformat.\`/path/to/t\`` SQL identifiers to the session catalog. We rely on - * the default [[SupportsCatalogOptions#extractCatalog]] which returns - * [[org.apache.spark.sql.connector.catalog.CatalogManager.SESSION_CATALOG_NAME]]. - */ -class FakePathBasedSource - extends FakeV2ProviderWithCustomSchema - with SupportsCatalogOptions - with DataSourceRegister { - - override def shortName(): String = "pathformat" - - // Strip our own format prefix from the multipart path and return the rest under our - // format-name namespace. - override def extractIdentifier(options: CaseInsensitiveStringMap): Identifier = - Identifier.of(Array(shortName()), options.get("path").stripPrefix(s"${shortName()}.")) -} - -/** - * Like [[FakePathBasedSource]] but resolves the owning catalog from the session config - * `spark.datasource.pathformat2.catalog` instead of always returning null. This simulates - * a connector that lets users configure the target catalog. - */ -class FakePathBasedSourceWithSessionConfig - extends FakeV2ProviderWithCustomSchema - with SupportsCatalogOptions - with SessionConfigSupport - with DataSourceRegister { - - override def shortName(): String = "pathformat2" - - override def keyPrefix: String = "pathformat2" - - override def extractCatalog(options: CaseInsensitiveStringMap): String = options.get("catalog") - - // Strip our own format prefix from the multipart path and return the rest under our - // format-name namespace. - override def extractIdentifier(options: CaseInsensitiveStringMap): Identifier = - Identifier.of(Array(shortName()), options.get("path").stripPrefix(s"${shortName()}.")) -} From 6a5d18139044192a54a7553a04327260daa8a0b7 Mon Sep 17 00:00:00 2001 From: Andreas Chatzistergiou Date: Wed, 20 May 2026 08:31:30 +0000 Subject: [PATCH 12/18] Remove single catalog enforcement --- .../resources/error/error-conditions.json | 8 ----- .../analysis/RelationResolution.scala | 4 --- .../UnresolveRelationsInTransaction.scala | 14 ++------ .../connector/catalog/CatalogManager.scala | 6 ---- .../TransactionAwareCatalogManager.scala | 12 ------- .../sql/errors/QueryCompilationErrors.scala | 10 ------ .../AppendDataTransactionSuite.scala | 23 +----------- .../connector/MergeIntoDataFrameSuite.scala | 32 +---------------- .../PathBasedTableTransactionSuite.scala | 35 ------------------- 9 files changed, 4 insertions(+), 140 deletions(-) diff --git a/common/utils/src/main/resources/error/error-conditions.json b/common/utils/src/main/resources/error/error-conditions.json index 672772c467716..fb0bb87172a8e 100644 --- a/common/utils/src/main/resources/error/error-conditions.json +++ b/common/utils/src/main/resources/error/error-conditions.json @@ -7234,14 +7234,6 @@ ], "sqlState" : "42601" }, - "TRANSACTION_MULTI_CATALOG_NOT_SUPPORTED" : { - "message" : [ - "All tables participating in a transactional operation must belong to the same catalog.", - "The write target belongs to catalog '', but the query also references", - "table(s) from catalog(s): ." - ], - "sqlState" : "0A000" - }, "TRANSFORM_WITH_STATE_SCHEMA_MUST_BE_NULLABLE" : { "message" : [ "If Avro encoding is enabled, all the fields in the schema for column family must be nullable", diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RelationResolution.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RelationResolution.scala index e702b829e220c..8769d1c4e4ffa 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RelationResolution.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RelationResolution.scala @@ -24,7 +24,6 @@ import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.SQLConfHelper import org.apache.spark.sql.catalyst.catalog.{ CatalogTable, - CatalogTableType, TemporaryViewRelation, UnresolvedCatalogRelation } @@ -296,11 +295,8 @@ class RelationResolution( // we don't share-cache views. val table: Option[Table] = tableOrView.filter { case t: MetadataTable if t.getTableInfo.isInstanceOf[ViewInfo] => false - case t: V1Table if t.catalogTable.tableType == CatalogTableType.VIEW => false case _ => true } - // Enforce single-catalog isolation for all table loads inside a transaction. - table.foreach(_ => catalogManager.validateCatalogForTableLoad(catalog)) val sharedRelationCacheMatch = for { t <- table diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnresolveRelationsInTransaction.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnresolveRelationsInTransaction.scala index a7070557f18d9..8ee64e32376fb 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnresolveRelationsInTransaction.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnresolveRelationsInTransaction.scala @@ -21,7 +21,6 @@ import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, TransactionalWr import org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.allowInvokingTransformsInAnalyzer import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogPlugin, LookupCatalog} -import org.apache.spark.sql.errors.QueryCompilationErrors import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation /** @@ -32,10 +31,6 @@ import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation * This forces re-resolution of those relations against the transaction's catalog, which * intercepts [[TableCatalog#loadTable]] calls to track which tables are read as part of * the transaction. - * - * Note: if any resolved relation is loaded from a different catalog we throw an error. - * We only support single catalog transactions. This allows us to track all tables that - * participate in the transaction through a single point. */ class UnresolveRelationsInTransaction(val catalogManager: CatalogManager) extends Rule[LogicalPlan] with LookupCatalog { @@ -60,12 +55,7 @@ class UnresolveRelationsInTransaction(val catalogManager: CatalogManager) plan: LogicalPlan, catalog: CatalogPlugin): LogicalPlan = { plan.transform { - case r: DataSourceV2Relation if r.identifier.isDefined => - if (!isLoadedFromCatalog(r, catalog)) { - throw QueryCompilationErrors.transactionMultiCatalogNotSupportedError( - catalog.name(), r.catalog.get.name()) - } - + case r: DataSourceV2Relation if isLoadedFromCatalog(r, catalog) => V2TableReference.createForTransaction(r) } } @@ -73,6 +63,6 @@ class UnresolveRelationsInTransaction(val catalogManager: CatalogManager) private def isLoadedFromCatalog( relation: DataSourceV2Relation, catalog: CatalogPlugin): Boolean = { - relation.catalog.exists(_.name == catalog.name) + relation.catalog.exists(_.name == catalog.name) && relation.identifier.isDefined } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogManager.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogManager.scala index c0a8438145448..01f900e22537c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogManager.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogManager.scala @@ -89,12 +89,6 @@ class CatalogManager( def withTransaction(transaction: Transaction): CatalogManager = new TransactionAwareCatalogManager(this, transaction) - /** - * Called to validate catalog for loading a table. Overridden by - * [[TransactionAwareCatalogManager]] to enforce single-catalog isolation per transaction. - */ - def validateCatalogForTableLoad(catalog: CatalogPlugin): Unit = {} - /** * Returns the catalog name and connector-canonicalized identifier for a multipart SQL name * whose head is a [[SupportsCatalogOptions]] data source format. Returns None if the format diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/TransactionAwareCatalogManager.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/TransactionAwareCatalogManager.scala index 10fea1ca1fb61..76ccd7480d97e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/TransactionAwareCatalogManager.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/TransactionAwareCatalogManager.scala @@ -20,7 +20,6 @@ package org.apache.spark.sql.connector.catalog import org.apache.spark.SparkException import org.apache.spark.sql.catalyst.catalog.TempVariableManager import org.apache.spark.sql.connector.catalog.transactions.Transaction -import org.apache.spark.sql.errors.QueryCompilationErrors /** * A [[CatalogManager]] decorator that redirects catalog lookups to the transaction's catalog @@ -50,17 +49,6 @@ private[sql] class TransactionAwareCatalogManager( if (txn.catalog.name() == resolved.name()) txn.catalog else resolved } - /** - * Validates that a table loaded during relation resolution belongs to the transaction catalog. - * All table loads during a transaction must come from the same catalog to ensure isolation. - */ - override def validateCatalogForTableLoad(catalog: CatalogPlugin): Unit = { - if (catalog.name() != txn.catalog.name()) { - throw QueryCompilationErrors.transactionMultiCatalogNotSupportedError( - txn.catalog.name(), catalog.name()) - } - } - override def currentCatalog: CatalogPlugin = { val c = delegate.currentCatalog if (txn.catalog.name() == c.name()) txn.catalog else c diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala index c5400f5c59e90..9b899867a9e37 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala @@ -1636,16 +1636,6 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase with Compilat "identifier" -> toSQLId(identifier))) } - def transactionMultiCatalogNotSupportedError( - txnCatalog: String, - foreignCatalog: String): Throwable = { - new AnalysisException( - errorClass = "TRANSACTION_MULTI_CATALOG_NOT_SUPPORTED", - messageParameters = Map( - "txnCatalog" -> txnCatalog, - "foreignCatalogs" -> foreignCatalog)) - } - def namespaceAlreadyExistsError(namespace: Array[String]): Throwable = { new NamespaceAlreadyExistsException(namespace) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/AppendDataTransactionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/AppendDataTransactionSuite.scala index 372cde7443102..aef9c65550fc4 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/AppendDataTransactionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/AppendDataTransactionSuite.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.connector import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.Row -import org.apache.spark.sql.connector.catalog.{Aborted, Committed, InMemoryRowLevelOperationTableCatalog} +import org.apache.spark.sql.connector.catalog.{Aborted, Committed} import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.SQLConf.PartitionOverwriteMode @@ -474,27 +474,6 @@ class AppendDataTransactionSuite extends RowLevelOperationSuiteBase { Row(12, 888, "hr", false))) } - test("multi-catalog query in transaction is rejected") { - createAndInitTable("pk INT NOT NULL, salary INT, dep STRING", - """{ "pk": 1, "salary": 100, "dep": "hr" } - |""".stripMargin) - - withSQLConf("spark.sql.catalog.cat2" -> - classOf[InMemoryRowLevelOperationTableCatalog].getName) { - sql("CREATE TABLE cat2.ns1.source (pk INT NOT NULL, salary INT, dep STRING)") - sql("INSERT INTO cat2.ns1.source VALUES (10, 999, 'hr')") - - val e = intercept[AnalysisException] { - sql(s"INSERT INTO $tableNameAsString SELECT * FROM cat2.ns1.source") - } - - checkError(e, "TRANSACTION_MULTI_CATALOG_NOT_SUPPORTED", - parameters = Map("txnCatalog" -> "cat", "foreignCatalogs" -> "cat2")) - assert(catalog.lastTransaction.currentState === Aborted) - assert(catalog.lastTransaction.isClosed) - } - } - test("SQL INSERT WITH SCHEMA EVOLUTION analysis failure aborts transaction") { createAndInitTable("pk INT NOT NULL, salary INT, dep STRING", """{ "pk": 1, "salary": 100, "dep": "hr" } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoDataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoDataFrameSuite.scala index f0bd943c94cfb..d2dfae13b511c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoDataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoDataFrameSuite.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.connector import org.apache.spark.sql.{sources, Column, Row} import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.classic.MergeIntoWriter -import org.apache.spark.sql.connector.catalog.{Aborted, Committed, InMemoryRowLevelOperationTableCatalog} +import org.apache.spark.sql.connector.catalog.{Aborted, Committed} import org.apache.spark.sql.connector.catalog.Identifier import org.apache.spark.sql.connector.catalog.TableInfo import org.apache.spark.sql.functions._ @@ -1184,36 +1184,6 @@ class MergeIntoDataFrameSuite extends RowLevelOperationSuiteBase { } } - test("merge with pre-analyzed source from a different catalog is rejected") { - createAndInitTable("pk INT NOT NULL, salary INT, dep STRING", - """{ "pk": 1, "salary": 100, "dep": "hr" } - |""".stripMargin) - - withSQLConf("spark.sql.catalog.cat2" -> - classOf[InMemoryRowLevelOperationTableCatalog].getName) { - sql("CREATE TABLE cat2.ns1.source (pk INT NOT NULL, salary INT, dep STRING)") - sql("INSERT INTO cat2.ns1.source VALUES (10, 999, 'hr')") - - val sourceDF = spark.table("cat2.ns1.source").as("source") - sourceDF.queryExecution.assertAnalyzed() - - val e = intercept[AnalysisException] { - sourceDF - .mergeInto(tableNameAsString, $"source.pk" === targetTableCol("pk")) - .whenMatched() - .updateAll() - .whenNotMatched() - .insertAll() - .merge() - } - - checkError(e, "TRANSACTION_MULTI_CATALOG_NOT_SUPPORTED", - parameters = Map("txnCatalog" -> "cat", "foreignCatalogs" -> "cat2")) - assert(catalog.lastTransaction.currentState === Aborted) - assert(catalog.lastTransaction.isClosed) - } - } - test("SPARK-54444: any schema changes after analysis are prohibited") { import org.apache.spark.sql.connector.catalog.Column val sourceTable = "cat.ns1.source_table" diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/PathBasedTableTransactionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/PathBasedTableTransactionSuite.scala index bb88a781e4815..c1fdc7643ef5a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/PathBasedTableTransactionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/PathBasedTableTransactionSuite.scala @@ -37,7 +37,6 @@ class PathBasedTableTransactionSuite extends RowLevelOperationSuiteBase { private val tablePath = "`/path/to/t`" private val tablePathWithFormat = "pathformat.`/path/to/t`" - private val tablePathWithFormat2 = "pathformat2.`/path/to/t`" override def beforeEach(): Unit = { super.beforeEach() @@ -229,38 +228,4 @@ class PathBasedTableTransactionSuite extends RowLevelOperationSuiteBase { checkAnswer(spark.table(tablePathWithFormat), Row(1, "a") :: Nil) } - test("path-based write with source from different catalog is rejected") { - createPathTable(tablePathWithFormat) - // cat is a different catalog from spark_catalog (the path-based catalog). - sql("CREATE TABLE cat.ns1.source (id INT, data STRING)") - - val e = intercept[AnalysisException] { - sql(s"INSERT INTO $tablePathWithFormat SELECT * FROM cat.ns1.source") - } - checkError(e, "TRANSACTION_MULTI_CATALOG_NOT_SUPPORTED", - parameters = Map("txnCatalog" -> "spark_catalog", "foreignCatalogs" -> "cat")) - assert(catalog.lastTransaction.currentState === Aborted) - assert(catalog.lastTransaction.isClosed) - } - - test("path-based write with source from session-config-routed catalog is rejected") { - withSQLConf( - "spark.sql.catalog.txncat" -> classOf[InMemoryRowLevelOperationTableCatalog].getName, - "spark.datasource.pathformat2.catalog" -> "txncat") { - // pathformat2 routes to txncat; create the source there. - createPathTable(tablePathWithFormat2) - sql(s"INSERT INTO $tablePathWithFormat2 VALUES (1, 'a')") - - // pathformat routes to the session catalog (default extractCatalog). - createPathTable(tablePathWithFormat) - - val e = intercept[AnalysisException] { - sql(s"INSERT INTO $tablePathWithFormat SELECT * FROM $tablePathWithFormat2") - } - checkError(e, "TRANSACTION_MULTI_CATALOG_NOT_SUPPORTED", - parameters = Map("txnCatalog" -> "spark_catalog", "foreignCatalogs" -> "txncat")) - assert(catalog.lastTransaction.currentState === Aborted) - assert(catalog.lastTransaction.isClosed) - } - } } From 000560c846c1267da15e4e45b5161559b95d129f Mon Sep 17 00:00:00 2001 From: Andreas Chatzistergiou Date: Thu, 21 May 2026 13:41:58 +0000 Subject: [PATCH 13/18] Improvements --- .../spark/sql/classic/DataStreamWriter.scala | 14 ++---- .../datasources/v2/DataSourceV2Utils.scala | 47 ++++++++++++++----- .../internal/BaseSessionStateBuilder.scala | 16 +++---- 3 files changed, 44 insertions(+), 33 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/classic/DataStreamWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/classic/DataStreamWriter.scala index 3e27605940383..2ec100ba0f59b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/classic/DataStreamWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/classic/DataStreamWriter.scala @@ -46,7 +46,6 @@ import org.apache.spark.sql.execution.streaming._ import org.apache.spark.sql.execution.streaming.sources._ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.streaming.{OutputMode, StreamingQuery, Trigger} -import org.apache.spark.sql.util.CaseInsensitiveStringMap import org.apache.spark.util.ArrayImplicits._ import org.apache.spark.util.Utils @@ -253,19 +252,12 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) extends streaming.D // file source v2 does not support streaming yet. classOf[FileDataSourceV2].isAssignableFrom(cls) - val optionsWithPath = if (path.isEmpty) { - extraOptions - } else { - extraOptions + ("path" -> path.get) - } + val optionsWithPath = DataSourceV2Utils.getOptionsWithPaths(extraOptions, path.toSeq: _*) val sink = if (classOf[TableProvider].isAssignableFrom(cls) && !useV1Source) { val provider = cls.getConstructor().newInstance().asInstanceOf[TableProvider] - val sessionOptions = DataSourceV2Utils.extractSessionConfigs( - source = provider, conf = ds.sparkSession.sessionState.conf) - val finalOptions = sessionOptions.filter { case (k, _) => !optionsWithPath.contains(k) } ++ - optionsWithPath.originalMap - val dsOptions = new CaseInsensitiveStringMap(finalOptions.asJava) + val dsOptions = DataSourceV2Utils.buildDsOptions( + provider, ds.sparkSession.sessionState.conf, optionsWithPath) // If the source accepts external table metadata, here we pass the schema of input query // to `getTable`. This is for avoiding schema inference, which can be very expensive. // If the query schema is not compatible with the existing data, the behavior is undefined. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Utils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Utils.scala index a3b5c5aeb7995..39e8d1cc6737d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Utils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Utils.scala @@ -30,7 +30,7 @@ import org.apache.spark.sql.catalyst.analysis.TimeTravelSpec import org.apache.spark.sql.catalyst.expressions.Literal import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap -import org.apache.spark.sql.connector.catalog.{CatalogV2Util, SessionConfigSupport, StagedTable, StagingTableCatalog, SupportsCatalogOptions, SupportsRead, Table, TableProvider} +import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogV2Util, Identifier, SessionConfigSupport, StagedTable, StagingTableCatalog, SupportsCatalogOptions, SupportsRead, Table, TableProvider} import org.apache.spark.sql.connector.catalog.TableCapability.BATCH_READ import org.apache.spark.sql.errors.QueryExecutionErrors import org.apache.spark.sql.execution.SQLExecution @@ -99,6 +99,36 @@ private[sql] object DataSourceV2Utils extends Logging { } } + /** + * Builds the [[CaseInsensitiveStringMap]] passed to a v2 [[TableProvider]]: session configs + * extracted from the provider, merged with the caller-supplied options-with-path map. + */ + def buildDsOptions( + provider: TableProvider, + conf: SQLConf, + optionsWithPath: CaseInsensitiveMap[String]): CaseInsensitiveStringMap = { + val sessionOptions = extractSessionConfigs(provider, conf) + val finalOptions = sessionOptions.filter { case (k, _) => !optionsWithPath.contains(k) } ++ + optionsWithPath.originalMap + new CaseInsensitiveStringMap(finalOptions.asJava) + } + + /** + * Extracts the catalog name and connector-canonical identifier from a + * [[SupportsCatalogOptions]] provider. Shared by all SCO entry points (DataFrame reader, SQL + * multipart-name resolution) so they observe identical null-handling semantics: + * `extractCatalog` returning null falls back to the session catalog, matching + * [[CatalogV2Util.getTableProviderCatalog]]. + */ + def extractCatalogAndIdentifier( + provider: SupportsCatalogOptions, + dsOptions: CaseInsensitiveStringMap): (String, Identifier) = { + val ident = provider.extractIdentifier(dsOptions) + val catalogName = Option(provider.extractCatalog(dsOptions)) + .getOrElse(CatalogManager.SESSION_CATALOG_NAME) + (catalogName, ident) + } + def loadV2Source( sparkSession: SparkSession, provider: TableProvider, @@ -108,23 +138,16 @@ private[sql] object DataSourceV2Utils extends Logging { paths: String*): Option[LogicalPlan] = { val catalogManager = sparkSession.sessionState.catalogManager val conf = sparkSession.sessionState.conf - val sessionOptions = DataSourceV2Utils.extractSessionConfigs(provider, conf) - val optionsWithPath = getOptionsWithPaths(extraOptions, paths: _*) - - val finalOptions = sessionOptions.filter { case (k, _) => !optionsWithPath.contains(k) } ++ - optionsWithPath.originalMap - val dsOptions = new CaseInsensitiveStringMap(finalOptions.asJava) + val dsOptions = buildDsOptions(provider, conf, optionsWithPath) val (table, catalog, ident, timeTravelSpec) = provider match { case _: SupportsCatalogOptions if userSpecifiedSchema.nonEmpty => throw new IllegalArgumentException( s"$source does not support user specified schema. Please don't specify the schema.") case hasCatalog: SupportsCatalogOptions => - val ident = hasCatalog.extractIdentifier(dsOptions) - val catalog = CatalogV2Util.getTableProviderCatalog( - hasCatalog, - catalogManager, - dsOptions) + import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ + val (catalogName, ident) = extractCatalogAndIdentifier(hasCatalog, dsOptions) + val catalog = catalogManager.catalog(catalogName).asTableCatalog val version = hasCatalog.extractTimeTravelVersion(dsOptions) val timestamp = hasCatalog.extractTimeTravelTimestamp(dsOptions) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala index e06ec1a0e2ef9..634a8cd44ca1b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala @@ -16,8 +16,6 @@ */ package org.apache.spark.sql.internal -import scala.jdk.CollectionConverters._ - import org.apache.spark.annotation.Unstable import org.apache.spark.sql.{DataSourceRegistration, ExperimentalMethods, SparkSessionExtensions, UDTFRegistration} import org.apache.spark.sql.artifact.ArtifactManager @@ -30,6 +28,7 @@ import org.apache.spark.sql.catalyst.optimizer.Optimizer import org.apache.spark.sql.catalyst.parser.ParserInterface import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap import org.apache.spark.sql.classic.{SparkSession, Strategy, StreamingCheckpointManager, StreamingQueryManager, UDFRegistration} import org.apache.spark.sql.connector.catalog.{CatalogManager, DataSourceCatalogResolver, SupportsCatalogOptions} import org.apache.spark.sql.errors.QueryCompilationErrors @@ -44,7 +43,7 @@ import org.apache.spark.sql.execution.externalUDF.{ClassicExternalUDFPlanner, ExternalUDFPlanner, UnifiedExternalUDFPlanner} import org.apache.spark.sql.execution.streaming.runtime.ResolveWriteToStream import org.apache.spark.sql.expressions.UserDefinedAggregateFunction -import org.apache.spark.sql.util.{CaseInsensitiveStringMap, ExecutionListenerManager} +import org.apache.spark.sql.util.ExecutionListenerManager /** * Builder class that coordinates construction of a new [[SessionState]]. @@ -169,13 +168,10 @@ abstract class BaseSessionStateBuilder( try { DataSource.lookupDataSourceV2(nameParts.head, conf).flatMap { case sco: SupportsCatalogOptions => - // Pass the full multipart name as the `path` option so the connector can do its - // own canonicalization (strip its format prefix, normalize, etc.). Session configs - // are passed through as well. - val sessionOpts = DataSourceV2Utils.extractSessionConfigs(sco, conf) - val opts = new CaseInsensitiveStringMap( - (sessionOpts + ("path" -> nameParts.mkString("."))).asJava) - Some((sco.extractCatalog(opts), sco.extractIdentifier(opts))) + val optionsWithPath = DataSourceV2Utils.getOptionsWithPaths( + CaseInsensitiveMap(Map.empty), nameParts.tail.mkString(".")) + val dsOptions = DataSourceV2Utils.buildDsOptions(sco, conf, optionsWithPath) + Some(DataSourceV2Utils.extractCatalogAndIdentifier(sco, dsOptions)) case _ => None } } catch { From 72acd85e282b663bed40973dfa74222bfd06cb18 Mon Sep 17 00:00:00 2001 From: Andreas Chatzistergiou Date: Thu, 21 May 2026 13:44:27 +0000 Subject: [PATCH 14/18] Fix FakeV2Provider --- .../org/apache/spark/sql/connector/FakeV2Provider.scala | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/FakeV2Provider.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/FakeV2Provider.scala index 97a9fd06c9f17..a486544421a78 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/FakeV2Provider.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/FakeV2Provider.scala @@ -108,10 +108,8 @@ class FakePathBasedSource override def shortName(): String = "pathformat" - // Strip our own format prefix from the multipart path and return the rest under our - // format-name namespace. override def extractIdentifier(options: CaseInsensitiveStringMap): Identifier = - Identifier.of(Array(shortName()), options.get("path").stripPrefix(s"${shortName()}.")) + Identifier.of(Array(shortName()), options.get("path")) } /** @@ -131,8 +129,6 @@ class FakePathBasedSourceWithSessionConfig override def extractCatalog(options: CaseInsensitiveStringMap): String = options.get("catalog") - // Strip our own format prefix from the multipart path and return the rest under our - // format-name namespace. override def extractIdentifier(options: CaseInsensitiveStringMap): Identifier = - Identifier.of(Array(shortName()), options.get("path").stripPrefix(s"${shortName()}.")) + Identifier.of(Array(shortName()), options.get("path")) } From 8631ba8620acc1de17f2189906eddcd6ac21dfd8 Mon Sep 17 00:00:00 2001 From: Andreas Chatzistergiou Date: Thu, 21 May 2026 13:45:05 +0000 Subject: [PATCH 15/18] Add PathBasedTableSuite --- .../sql/connector/PathBasedTableSuite.scala | 191 ++++++++++++++++++ 1 file changed, 191 insertions(+) create mode 100644 sql/core/src/test/scala/org/apache/spark/sql/connector/PathBasedTableSuite.scala diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/PathBasedTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/PathBasedTableSuite.scala new file mode 100644 index 0000000000000..75f0437f19a90 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/PathBasedTableSuite.scala @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connector + +import org.apache.spark.sql.{AnalysisException, QueryTest, Row} +import org.apache.spark.sql.connector.catalog.InMemoryTableCatalog +import org.apache.spark.sql.internal.SQLConf.V2_SESSION_CATALOG_IMPLEMENTATION +import org.apache.spark.sql.test.SharedSparkSession + +/** + * Non-transactional tests for SQL resolution of path-based tables surfaced by a + * [[org.apache.spark.sql.connector.catalog.SupportsCatalogOptions]] data source + * (e.g. `pathformat.`/path/to/t``). Covers reads, DDL, CREATE/REPLACE, regression for v1 + * file-format direct queries, and the `runSQLOnFile` gate. Transactional behavior is + * covered separately in [[PathBasedTableTransactionSuite]]. + */ +class PathBasedTableSuite extends QueryTest with SharedSparkSession { + + import testImplicits._ + + // FakePathBasedSource rewrites `pathformat.\`/path/to/t\`` to the session catalog with + // Identifier(ns = ["pathformat"], name = "/path/to/t"). InMemoryTableCatalog accepts + // arbitrary namespace/name shapes, so we plug it in as the v2 session catalog. + private val tablePath = "pathformat.`/path/to/t`" + + override def beforeEach(): Unit = { + super.beforeEach() + spark.conf.set( + V2_SESSION_CATALOG_IMPLEMENTATION.key, classOf[InMemoryTableCatalog].getName) + } + + override def afterEach(): Unit = { + // SharedSparkSession reuses one SparkSession across tests, so the in-memory catalog's + // table map would persist between tests. Reset clears registered catalogs so each test + // sees a fresh session catalog instance. + spark.sessionState.catalogManager.reset() + spark.conf.unset(V2_SESSION_CATALOG_IMPLEMENTATION.key) + super.afterEach() + } + + test("CREATE then SELECT on path-based table") { + sql(s"CREATE TABLE $tablePath (id INT, data STRING)") + sql(s"INSERT INTO $tablePath VALUES (1, 'a'), (2, 'b')") + checkAnswer(spark.table(tablePath), Row(1, "a") :: Row(2, "b") :: Nil) + } + + test("DESCRIBE TABLE resolves path-based table") { + sql(s"CREATE TABLE $tablePath (id INT, data STRING)") + checkAnswer( + sql(s"DESCRIBE TABLE $tablePath").select("col_name", "data_type"), + Row("id", "int") :: Row("data", "string") :: Nil) + } + + test("ALTER TABLE on path-based table") { + sql(s"CREATE TABLE $tablePath (id INT, data STRING)") + sql(s"ALTER TABLE $tablePath ADD COLUMNS (extra DOUBLE)") + val columns = sql(s"DESCRIBE TABLE $tablePath").collect().map(_.getString(0)).toSet + assert(Set("id", "data", "extra").subsetOf(columns)) + } + + test("DROP TABLE on path-based table") { + sql(s"CREATE TABLE $tablePath (id INT, data STRING)") + sql(s"DROP TABLE $tablePath") + intercept[AnalysisException] { + sql(s"SELECT * FROM $tablePath") + } + } + + test("JOIN across two path-based tables") { + val a = "pathformat.`/a`" + val b = "pathformat.`/b`" + sql(s"CREATE TABLE $a (id INT, x STRING)") + sql(s"CREATE TABLE $b (id INT, y STRING)") + sql(s"INSERT INTO $a VALUES (1, 'x1'), (2, 'x2')") + sql(s"INSERT INTO $b VALUES (1, 'y1'), (2, 'y2')") + checkAnswer( + sql(s"SELECT a.id, a.x, b.y FROM $a a JOIN $b b ON a.id = b.id ORDER BY a.id"), + Row(1, "x1", "y1") :: Row(2, "x2", "y2") :: Nil) + } + + test("session-config catalog routes non-transactional reads") { + val target = "tgt" + withSQLConf( + s"spark.sql.catalog.$target" -> classOf[InMemoryTableCatalog].getName, + s"spark.datasource.pathformat2.catalog" -> target) { + sql("CREATE TABLE pathformat2.`/p` (id INT, data STRING)") + sql("INSERT INTO pathformat2.`/p` VALUES (1, 'a')") + checkAnswer(spark.table("pathformat2.`/p`"), Row(1, "a") :: Nil) + val tgt = spark.sessionState.catalogManager.catalog(target) + .asInstanceOf[InMemoryTableCatalog] + assert(tgt.listTables(Array("pathformat2")).map(_.name()).contains("/p")) + } + } + + test("regression: v1 file format direct query still resolves") { + withTempDir { dir => + val path = new java.io.File(dir, "p.parquet").getCanonicalPath + Seq((1, "a"), (2, "b")).toDF("id", "data").write.parquet(path) + checkAnswer(sql(s"SELECT * FROM parquet.`$path`"), Row(1, "a") :: Row(2, "b") :: Nil) + } + } + + test("unknown format head produces table-not-found, not ClassNotFoundException") { + val e = intercept[AnalysisException] { + sql("SELECT * FROM unknown_fmt.`/path/to/t`") + } + // The format head is not a registered data source and not a catalog. Resolution + // falls through to a normal table-not-found error. + assert(e.getCondition == "TABLE_OR_VIEW_NOT_FOUND" || + e.getMessage.toLowerCase.contains("not found")) + } + + test("VERSION AS OF on path-based table") { + val base = "pathformat.`/p`" + // InMemoryTableCatalog implements time travel by appending the version string to the + // identifier name (see loadTable(ident, version) — it looks up name + version). + val versioned = "pathformat.`/pv1`" + sql(s"CREATE TABLE $base (id INT)") + sql(s"CREATE TABLE $versioned (id INT)") + sql(s"INSERT INTO $base VALUES (1)") + sql(s"INSERT INTO $versioned VALUES (2)") + checkAnswer(sql(s"SELECT * FROM $base VERSION AS OF 'v1'"), Row(2)) + } + + test("SCO precedence: data source name wins over same-named catalog") { + // Register a catalog under the same name as the SCO data source short name. + // Resolution should still route through the SCO resolver, i.e. the table is + // created under the session catalog (`spark_catalog`), not under "pathformat". + withSQLConf("spark.sql.catalog.pathformat" -> classOf[InMemoryTableCatalog].getName) { + sql(s"CREATE TABLE $tablePath (id INT, data STRING)") + sql(s"INSERT INTO $tablePath VALUES (1, 'a')") + checkAnswer(spark.table(tablePath), Row(1, "a") :: Nil) + + // Table lives in the session catalog under namespace=["pathformat"], not in the + // catalog registered as "pathformat". + val sessionCat = spark.sessionState.catalogManager.v2SessionCatalog + .asInstanceOf[InMemoryTableCatalog] + assert(sessionCat.listTables(Array("pathformat")).map(_.name()).contains("/path/to/t")) + val homonymCat = spark.sessionState.catalogManager.catalog("pathformat") + .asInstanceOf[InMemoryTableCatalog] + assert(homonymCat.listTables(Array.empty).isEmpty) + } + } + + test("CREATE TABLE AS SELECT on path-based table") { + sql("CREATE TABLE source (id INT, data STRING)") + sql("INSERT INTO source VALUES (1, 'a'), (2, 'b')") + sql(s"CREATE TABLE $tablePath AS SELECT * FROM source") + checkAnswer(spark.table(tablePath), Row(1, "a") :: Row(2, "b") :: Nil) + } + + test("REPLACE TABLE AS SELECT on path-based table") { + sql("CREATE TABLE source (id INT, data STRING)") + sql("INSERT INTO source VALUES (1, 'a'), (2, 'b'), (3, 'c')") + sql(s"CREATE TABLE $tablePath AS SELECT * FROM source") + sql(s"REPLACE TABLE $tablePath AS SELECT id FROM source WHERE id > 1") + checkAnswer(spark.table(tablePath), Row(2) :: Row(3) :: Nil) + } + + test("INSERT OVERWRITE on path-based table") { + sql(s"CREATE TABLE $tablePath (id INT, data STRING)") + sql(s"INSERT INTO $tablePath VALUES (1, 'a'), (2, 'b')") + sql(s"INSERT OVERWRITE $tablePath VALUES (9, 'z')") + checkAnswer(spark.table(tablePath), Row(9, "z") :: Nil) + } + + test("DataFrame API regression: read still resolves via SCO") { + // Create via SQL (exercises the new LookupCatalog SCO seam), read via DataFrame + // (exercises the pre-existing DataFrameReader SCO path in DataSourceV2Utils). + // Both paths should land on the same Identifier in the session catalog. + sql(s"CREATE TABLE $tablePath (id INT, data STRING)") + sql(s"INSERT INTO $tablePath VALUES (1, 'a'), (2, 'b')") + val df = spark.read.format("pathformat").load("/path/to/t") + checkAnswer(df, Row(1, "a") :: Row(2, "b") :: Nil) + } +} From ae8f4a8bdebd2ceaba09f0cb69f53129b6f14b9e Mon Sep 17 00:00:00 2001 From: Andreas Chatzistergiou Date: Fri, 22 May 2026 10:42:41 +0000 Subject: [PATCH 16/18] Improvements --- .../datasources/v2/DataSourceV2Utils.scala | 26 ++++++++++++++++++- .../internal/BaseSessionStateBuilder.scala | 19 ++------------ .../sql/connector/PathBasedTableSuite.scala | 5 ++-- 3 files changed, 29 insertions(+), 21 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Utils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Utils.scala index 39e8d1cc6737d..d2dfb1661e059 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Utils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Utils.scala @@ -30,7 +30,7 @@ import org.apache.spark.sql.catalyst.analysis.TimeTravelSpec import org.apache.spark.sql.catalyst.expressions.Literal import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap -import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogV2Util, Identifier, SessionConfigSupport, StagedTable, StagingTableCatalog, SupportsCatalogOptions, SupportsRead, Table, TableProvider} +import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogV2Util, DataSourceCatalogResolver, Identifier, SessionConfigSupport, StagedTable, StagingTableCatalog, SupportsCatalogOptions, SupportsRead, Table, TableProvider} import org.apache.spark.sql.connector.catalog.TableCapability.BATCH_READ import org.apache.spark.sql.errors.QueryExecutionErrors import org.apache.spark.sql.execution.SQLExecution @@ -129,6 +129,30 @@ private[sql] object DataSourceV2Utils extends Logging { (catalogName, ident) } + /** + * Resolver bound to a session [[SQLConf]] that maps a multipart SQL identifier + * (e.g. `pathformat.\`/path/to/t\``) to a `(catalogName, identifier)` pair when the head + * names a registered [[SupportsCatalogOptions]] data source. Returns `None` for non-SCO + * sources or unknown format heads, letting the caller fall back to standard catalog + * resolution. + */ + def supportsCatalogOptionsResolver(conf: SQLConf): DataSourceCatalogResolver = + (nameParts: Seq[String]) => + try { + DataSource.lookupDataSourceV2(nameParts.head, conf).flatMap { + case sco: SupportsCatalogOptions => + val optionsWithPath = getOptionsWithPaths( + CaseInsensitiveMap(Map.empty), nameParts.tail.mkString(".")) + val dsOptions = buildDsOptions(sco, conf, optionsWithPath) + Some(extractCatalogAndIdentifier(sco, dsOptions)) + case _ => None + } + } catch { + // The format name is not a registered data source. Fall through and let the caller + // treat it as a catalog/namespace name. + case _: ClassNotFoundException => None + } + def loadV2Source( sparkSession: SparkSession, provider: TableProvider, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala index 634a8cd44ca1b..78e0a1bd29587 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala @@ -28,9 +28,8 @@ import org.apache.spark.sql.catalyst.optimizer.Optimizer import org.apache.spark.sql.catalyst.parser.ParserInterface import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.rules.Rule -import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap import org.apache.spark.sql.classic.{SparkSession, Strategy, StreamingCheckpointManager, StreamingQueryManager, UDFRegistration} -import org.apache.spark.sql.connector.catalog.{CatalogManager, DataSourceCatalogResolver, SupportsCatalogOptions} +import org.apache.spark.sql.connector.catalog.{CatalogManager, DataSourceCatalogResolver} import org.apache.spark.sql.errors.QueryCompilationErrors import org.apache.spark.sql.execution.{ColumnarRule, CommandExecutionMode, QueryExecution, SparkOptimizer, SparkPlanner, SparkSqlParser} import org.apache.spark.sql.execution.adaptive.AdaptiveRulesHolder @@ -164,21 +163,7 @@ abstract class BaseSessionStateBuilder( protected lazy val v2SessionCatalog = new V2SessionCatalog(catalog) protected lazy val dataSourceCatalogResolver: DataSourceCatalogResolver = - (nameParts: Seq[String]) => - try { - DataSource.lookupDataSourceV2(nameParts.head, conf).flatMap { - case sco: SupportsCatalogOptions => - val optionsWithPath = DataSourceV2Utils.getOptionsWithPaths( - CaseInsensitiveMap(Map.empty), nameParts.tail.mkString(".")) - val dsOptions = DataSourceV2Utils.buildDsOptions(sco, conf, optionsWithPath) - Some(DataSourceV2Utils.extractCatalogAndIdentifier(sco, dsOptions)) - case _ => None - } - } catch { - // The format name is not a registered data source. Fall through and let the caller - // treat it as a catalog/namespace name. - case _: ClassNotFoundException => None - } + DataSourceV2Utils.supportsCatalogOptionsResolver(conf) protected lazy val catalogManager = { val cm = new CatalogManager(v2SessionCatalog, catalog, dataSourceCatalogResolver) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/PathBasedTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/PathBasedTableSuite.scala index 75f0437f19a90..5f7a42c3e6e68 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/PathBasedTableSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/PathBasedTableSuite.scala @@ -121,14 +121,13 @@ class PathBasedTableSuite extends QueryTest with SharedSparkSession { } // The format head is not a registered data source and not a catalog. Resolution // falls through to a normal table-not-found error. - assert(e.getCondition == "TABLE_OR_VIEW_NOT_FOUND" || - e.getMessage.toLowerCase.contains("not found")) + assert(e.getCondition == "TABLE_OR_VIEW_NOT_FOUND") } test("VERSION AS OF on path-based table") { val base = "pathformat.`/p`" // InMemoryTableCatalog implements time travel by appending the version string to the - // identifier name (see loadTable(ident, version) — it looks up name + version). + // identifier name (see loadTable(ident, version) - it looks up name + version). val versioned = "pathformat.`/pv1`" sql(s"CREATE TABLE $base (id INT)") sql(s"CREATE TABLE $versioned (id INT)") From 88cc8d6fb9c2a4aab19ed6e68c9d14b2f0875a6f Mon Sep 17 00:00:00 2001 From: Andreas Chatzistergiou Date: Fri, 22 May 2026 13:07:33 +0000 Subject: [PATCH 17/18] Change resolution precedence + test improvements --- .../sql/connector/catalog/LookupCatalog.scala | 25 ++-- .../spark/sql/connector/FakeV2Provider.scala | 20 ++- .../sql/connector/PathBasedTableSuite.scala | 116 +++++++++++++----- .../PathBasedTableTransactionSuite.scala | 65 +++++++--- 4 files changed, 162 insertions(+), 64 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/LookupCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/LookupCatalog.scala index aae5557bab326..34bfbe4d39198 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/LookupCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/LookupCatalog.scala @@ -121,17 +121,9 @@ private[sql] trait LookupCatalog extends Logging { // this custom catalog can't be accessed. Some((catalogManager.v2SessionCatalog, nameParts.asIdentifier)) } else { - // Path-based data sources (e.g. `pathformat2.'/path/to/t'`) whose format declares a - // catalog via SupportsCatalogOptions are routed to that catalog. Both the catalog and - // the canonical identifier come from the connector. - val (catalogName, ident) = - Option(catalogManager.catalogAndIdentForDataSource(nameParts)).flatten match { - case Some((catName, providerIdent)) => (catName, providerIdent) - case None => (nameParts.head, nameParts.tail.asIdentifier) - } - try { - val catalog = catalogManager.catalog(catalogName) + val catalog = catalogManager.catalog(nameParts.head) + val ident = nameParts.tail.asIdentifier if (CatalogV2Util.isSessionCatalog(catalog)) { // Reject only when namespace is empty (e.g. spark_catalog.t with no database). // Allow multi-part namespace for metadata tables (e.g. default.table.snapshots). @@ -143,7 +135,18 @@ private[sql] trait LookupCatalog extends Logging { Some((catalog, ident)) } catch { case _: CatalogNotFoundException => - Some((currentCatalog, nameParts.asIdentifier)) + // No catalog matched. As a fallback, try path-based data sources: + // formats implementing SupportsCatalogOptions (e.g. `pathformat.`/path/to/t``) + // route to the catalog the connector designates. If no SCO format claims the + // identifier head, fall through to currentCatalog and let later analysis raise + // table-not-found. This matches the v1 file-format precedence (catalog first, + // path-based as fallback). + Option(catalogManager.catalogAndIdentForDataSource(nameParts)).flatten match { + case Some((catName, providerIdent)) => + Some((catalogManager.catalog(catName), providerIdent)) + case None => + Some((currentCatalog, nameParts.asIdentifier)) + } } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/FakeV2Provider.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/FakeV2Provider.scala index a486544421a78..031dc0ada21e9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/FakeV2Provider.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/FakeV2Provider.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.connector import java.util +import java.util.Optional import org.apache.spark.sql.connector.catalog.{Identifier, SessionConfigSupport, SupportsCatalogOptions, SupportsV1OverwriteWithSaveAsTable, Table, TableProvider} import org.apache.spark.sql.connector.expressions.Transform @@ -96,10 +97,11 @@ class FakeV2ProviderWithV1SaveAsTableOverwriteWriteOptionDisabled } /** - * Simulates a path-based connector (e.g. Delta) that implements [[SupportsCatalogOptions]] - * to route `pathformat.\`/path/to/t\`` SQL identifiers to the session catalog. We rely on - * the default [[SupportsCatalogOptions#extractCatalog]] which returns - * [[org.apache.spark.sql.connector.catalog.CatalogManager.SESSION_CATALOG_NAME]]. + * Simulates a path-based connector that implements [[SupportsCatalogOptions]] and routes + * `pathformat.\`/path/to/t\`` SQL identifiers to a dedicated catalog (`pathformat_cat`). + * Tests register that catalog and assert against it so the SCO seam is exercised + * unambiguously: without SCO, `CatalogAndIdentifier` falls back to the current catalog + * (session catalog) and the target catalog stays empty. */ class FakePathBasedSource extends FakeV2ProviderWithCustomSchema @@ -108,8 +110,18 @@ class FakePathBasedSource override def shortName(): String = "pathformat" + override def extractCatalog(options: CaseInsensitiveStringMap): String = + FakePathBasedSource.CATALOG_NAME + override def extractIdentifier(options: CaseInsensitiveStringMap): Identifier = Identifier.of(Array(shortName()), options.get("path")) + + override def extractTimeTravelVersion(options: CaseInsensitiveStringMap): Optional[String] = + Optional.ofNullable(options.get("versionAsOf")) +} + +object FakePathBasedSource { + val CATALOG_NAME: String = "pathformat_cat" } /** diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/PathBasedTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/PathBasedTableSuite.scala index 5f7a42c3e6e68..e1ac1bcf38cc6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/PathBasedTableSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/PathBasedTableSuite.scala @@ -19,37 +19,40 @@ package org.apache.spark.sql.connector import org.apache.spark.sql.{AnalysisException, QueryTest, Row} import org.apache.spark.sql.connector.catalog.InMemoryTableCatalog -import org.apache.spark.sql.internal.SQLConf.V2_SESSION_CATALOG_IMPLEMENTATION import org.apache.spark.sql.test.SharedSparkSession /** * Non-transactional tests for SQL resolution of path-based tables surfaced by a * [[org.apache.spark.sql.connector.catalog.SupportsCatalogOptions]] data source - * (e.g. `pathformat.`/path/to/t``). Covers reads, DDL, CREATE/REPLACE, regression for v1 - * file-format direct queries, and the `runSQLOnFile` gate. Transactional behavior is - * covered separately in [[PathBasedTableTransactionSuite]]. + * (e.g. `pathformat.`/path/to/t``). [[FakePathBasedSource]] routes resolution to a + * dedicated `pathformat_cat` catalog rather than the session catalog, so assertions + * against that catalog unambiguously confirm the SCO seam fired — without SCO, + * `CatalogAndIdentifier`'s fallback lands in the (default) session catalog and the + * named catalog stays empty. */ class PathBasedTableSuite extends QueryTest with SharedSparkSession { import testImplicits._ - // FakePathBasedSource rewrites `pathformat.\`/path/to/t\`` to the session catalog with - // Identifier(ns = ["pathformat"], name = "/path/to/t"). InMemoryTableCatalog accepts - // arbitrary namespace/name shapes, so we plug it in as the v2 session catalog. private val tablePath = "pathformat.`/path/to/t`" + private def pathformatCat: InMemoryTableCatalog = + spark.sessionState.catalogManager.catalog(FakePathBasedSource.CATALOG_NAME) + .asInstanceOf[InMemoryTableCatalog] + override def beforeEach(): Unit = { super.beforeEach() spark.conf.set( - V2_SESSION_CATALOG_IMPLEMENTATION.key, classOf[InMemoryTableCatalog].getName) + s"spark.sql.catalog.${FakePathBasedSource.CATALOG_NAME}", + classOf[InMemoryTableCatalog].getName) } override def afterEach(): Unit = { - // SharedSparkSession reuses one SparkSession across tests, so the in-memory catalog's - // table map would persist between tests. Reset clears registered catalogs so each test - // sees a fresh session catalog instance. + // SharedSparkSession reuses one SparkSession across tests. `reset()` drops registered + // non-session catalogs (including pathformat_cat), so the next test starts with a + // fresh InMemoryTableCatalog instance. spark.sessionState.catalogManager.reset() - spark.conf.unset(V2_SESSION_CATALOG_IMPLEMENTATION.key) + spark.conf.unset(s"spark.sql.catalog.${FakePathBasedSource.CATALOG_NAME}") super.afterEach() } @@ -57,6 +60,9 @@ class PathBasedTableSuite extends QueryTest with SharedSparkSession { sql(s"CREATE TABLE $tablePath (id INT, data STRING)") sql(s"INSERT INTO $tablePath VALUES (1, 'a'), (2, 'b')") checkAnswer(spark.table(tablePath), Row(1, "a") :: Row(2, "b") :: Nil) + // The SCO resolver routed creation/reads to pathformat_cat. Without the seam, + // CREATE would fall through to the (default) session catalog and fail. + assert(pathformatCat.listTables(Array("pathformat")).map(_.name()).contains("/path/to/t")) } test("DESCRIBE TABLE resolves path-based table") { @@ -136,39 +142,43 @@ class PathBasedTableSuite extends QueryTest with SharedSparkSession { checkAnswer(sql(s"SELECT * FROM $base VERSION AS OF 'v1'"), Row(2)) } - test("SCO precedence: data source name wins over same-named catalog") { - // Register a catalog under the same name as the SCO data source short name. - // Resolution should still route through the SCO resolver, i.e. the table is - // created under the session catalog (`spark_catalog`), not under "pathformat". + test("catalog precedence: same-named catalog wins over SCO data source") { + // Register a catalog under the same name as the SCO data source short name. SQL + // resolution should route to the catalog; the SCO resolver is consulted only when + // no catalog claims the head, matching v1 file-format precedence (ResolveSQLOnFile) + // and Delta's ResolveDeltaPathTable extension. withSQLConf("spark.sql.catalog.pathformat" -> classOf[InMemoryTableCatalog].getName) { sql(s"CREATE TABLE $tablePath (id INT, data STRING)") sql(s"INSERT INTO $tablePath VALUES (1, 'a')") checkAnswer(spark.table(tablePath), Row(1, "a") :: Nil) - // Table lives in the session catalog under namespace=["pathformat"], not in the - // catalog registered as "pathformat". - val sessionCat = spark.sessionState.catalogManager.v2SessionCatalog - .asInstanceOf[InMemoryTableCatalog] - assert(sessionCat.listTables(Array("pathformat")).map(_.name()).contains("/path/to/t")) + // Table lives in the homonym catalog at identifier (ns=[], name="/path/to/t"), + // and the SCO-targeted catalog (pathformat_cat) is untouched because the SCO + // resolver was never consulted. val homonymCat = spark.sessionState.catalogManager.catalog("pathformat") .asInstanceOf[InMemoryTableCatalog] - assert(homonymCat.listTables(Array.empty).isEmpty) + assert(homonymCat.listTables(Array.empty).map(_.name()).contains("/path/to/t")) + assert(!pathformatCat.namespaceExists(Array("pathformat"))) } } test("CREATE TABLE AS SELECT on path-based table") { - sql("CREATE TABLE source (id INT, data STRING)") - sql("INSERT INTO source VALUES (1, 'a'), (2, 'b')") - sql(s"CREATE TABLE $tablePath AS SELECT * FROM source") - checkAnswer(spark.table(tablePath), Row(1, "a") :: Row(2, "b") :: Nil) + withTable("source") { + sql("CREATE TABLE source (id INT, data STRING)") + sql("INSERT INTO source VALUES (1, 'a'), (2, 'b')") + sql(s"CREATE TABLE $tablePath AS SELECT * FROM source") + checkAnswer(spark.table(tablePath), Row(1, "a") :: Row(2, "b") :: Nil) + } } test("REPLACE TABLE AS SELECT on path-based table") { - sql("CREATE TABLE source (id INT, data STRING)") - sql("INSERT INTO source VALUES (1, 'a'), (2, 'b'), (3, 'c')") - sql(s"CREATE TABLE $tablePath AS SELECT * FROM source") - sql(s"REPLACE TABLE $tablePath AS SELECT id FROM source WHERE id > 1") - checkAnswer(spark.table(tablePath), Row(2) :: Row(3) :: Nil) + withTable("source") { + sql("CREATE TABLE source (id INT, data STRING)") + sql("INSERT INTO source VALUES (1, 'a'), (2, 'b'), (3, 'c')") + sql(s"CREATE TABLE $tablePath AS SELECT * FROM source") + sql(s"REPLACE TABLE $tablePath AS SELECT id FROM source WHERE id > 1") + checkAnswer(spark.table(tablePath), Row(2) :: Row(3) :: Nil) + } } test("INSERT OVERWRITE on path-based table") { @@ -178,13 +188,53 @@ class PathBasedTableSuite extends QueryTest with SharedSparkSession { checkAnswer(spark.table(tablePath), Row(9, "z") :: Nil) } - test("DataFrame API regression: read still resolves via SCO") { + test("DataFrame API: read resolves via SCO") { // Create via SQL (exercises the new LookupCatalog SCO seam), read via DataFrame // (exercises the pre-existing DataFrameReader SCO path in DataSourceV2Utils). - // Both paths should land on the same Identifier in the session catalog. + // Both paths should land on the same Identifier in pathformat_cat. sql(s"CREATE TABLE $tablePath (id INT, data STRING)") sql(s"INSERT INTO $tablePath VALUES (1, 'a'), (2, 'b')") val df = spark.read.format("pathformat").load("/path/to/t") checkAnswer(df, Row(1, "a") :: Row(2, "b") :: Nil) } + + test("DataFrame API: write via SCO, read via SQL") { + // Write through DataFrameWriter (exercises the refactored buildDsOptions in the v2 + // write path), read back via SQL to confirm both entry points land on the same + // Identifier under pathformat_cat. + Seq((1, "a"), (2, "b")).toDF("id", "data") + .write.format("pathformat").save("/path/to/t") + checkAnswer(spark.table(tablePath), Row(1, "a") :: Row(2, "b") :: Nil) + } + + test("DataFrame API: time travel via SCO") { + // InMemoryTableCatalog implements time travel by appending the version string to + // the identifier name. SCO time-travel options flow through the refactored + // extractCatalogAndIdentifier helper, so this also regresses that path. + sql("CREATE TABLE pathformat.`/p` (id INT)") + sql("CREATE TABLE pathformat.`/pv1` (id INT)") + sql("INSERT INTO pathformat.`/p` VALUES (1)") + sql("INSERT INTO pathformat.`/pv1` VALUES (2)") + val df = spark.read.format("pathformat").option("versionAsOf", "v1").load("/p") + checkAnswer(df, Row(2)) + } + + test("DataFrame API: pure write and read via SCO (no SQL)") { + // Uses only DataFrameWriter/DataFrameReader, so it exercises the v2 SCO entry point + // in DataSourceV2Utils.loadV2Source / the writer's getTableProviderCatalog branch + // independently of LookupCatalog. Survives even when the SQL SCO seam is removed. + Seq((1, "a"), (2, "b")).toDF("id", "data") + .write.format("pathformat").save("/path/to/t") + val df = spark.read.format("pathformat").load("/path/to/t") + checkAnswer(df, Row(1, "a") :: Row(2, "b") :: Nil) + assert(pathformatCat.listTables(Array("pathformat")).map(_.name()).contains("/path/to/t")) + } + + test("DataFrame API: pure time travel via SCO (no SQL)") { + // Pure DataFrame setup so the test does not depend on the SQL SCO seam. + Seq(1).toDF("id").write.format("pathformat").save("/p") + Seq(2).toDF("id").write.format("pathformat").save("/pv1") + val df = spark.read.format("pathformat").option("versionAsOf", "v1").load("/p") + checkAnswer(df, Row(2)) + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/PathBasedTableTransactionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/PathBasedTableTransactionSuite.scala index c1fdc7643ef5a..98688a4b1b095 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/PathBasedTableTransactionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/PathBasedTableTransactionSuite.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.connector import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.Row -import org.apache.spark.sql.connector.catalog.{Aborted, Committed, InMemoryRowLevelOperationTableCatalog, InMemoryTableCatalog, SharedTablesInMemoryRowLevelOperationTableCatalog} +import org.apache.spark.sql.connector.catalog.{Aborted, Committed, InMemoryRowLevelOperationTableCatalog, InMemoryTableCatalog, SharedTablesInMemoryRowLevelOperationTableCatalog, Txn, TxnTable} import org.apache.spark.sql.execution.streaming.runtime.{MemoryStream, StreamingQueryWrapper} import org.apache.spark.sql.internal.SQLConf.V2_SESSION_CATALOG_IMPLEMENTATION import org.apache.spark.sql.sources @@ -28,8 +28,9 @@ import org.apache.spark.sql.streaming.StreamingQuery /** * Tests for transactional writes to path-based tables, where the table is identified by a * bare path with no catalog prefix (e.g. `/path/to/t`), or a connector-prefixed path - * (e.g. `pathformat.`/path/to/t``). The transactional catalog is registered as the session - * catalog (`spark_catalog`). + * (e.g. `pathformat.`/path/to/t``). Bare paths resolve to the session catalog + * (`spark_catalog`, set to a transactional impl); connector-prefixed paths route through + * the SCO seam to a dedicated transactional catalog (`pathformat_cat`). */ class PathBasedTableTransactionSuite extends RowLevelOperationSuiteBase { @@ -43,11 +44,15 @@ class PathBasedTableTransactionSuite extends RowLevelOperationSuiteBase { spark.conf.set( V2_SESSION_CATALOG_IMPLEMENTATION.key, classOf[SharedTablesInMemoryRowLevelOperationTableCatalog].getName) + spark.conf.set( + s"spark.sql.catalog.${FakePathBasedSource.CATALOG_NAME}", + classOf[SharedTablesInMemoryRowLevelOperationTableCatalog].getName) } override def afterEach(): Unit = { SharedTablesInMemoryRowLevelOperationTableCatalog.reset() spark.conf.unset(V2_SESSION_CATALOG_IMPLEMENTATION.key) + spark.conf.unset(s"spark.sql.catalog.${FakePathBasedSource.CATALOG_NAME}") super.afterEach() } @@ -56,12 +61,29 @@ class PathBasedTableTransactionSuite extends RowLevelOperationSuiteBase { .asInstanceOf[InMemoryRowLevelOperationTableCatalog] } - private def streamSessionCatalog(query: StreamingQuery): InMemoryRowLevelOperationTableCatalog = { + // The SCO-targeted catalog that connector-prefixed paths route to. Distinct instance + // from `catalog` (session); both share table state via the SharedTables static map. + protected def pathformatCat: InMemoryRowLevelOperationTableCatalog = { + spark.sessionState.catalogManager.catalog(FakePathBasedSource.CATALOG_NAME) + .asInstanceOf[InMemoryRowLevelOperationTableCatalog] + } + + private def streamPathformatCat( + query: StreamingQuery): InMemoryRowLevelOperationTableCatalog = { val session = query.asInstanceOf[StreamingQueryWrapper].streamingQuery.sparkSessionForStream - session.sessionState.catalogManager.v2SessionCatalog + session.sessionState.catalogManager.catalog(FakePathBasedSource.CATALOG_NAME) .asInstanceOf[InMemoryRowLevelOperationTableCatalog] } + // Bare-path tests resolve to the session catalog; connector-prefixed tests resolve via + // SCO to `pathformat_cat`. Return whichever catalog actually ran a transaction so test + // bodies can keep using `executeTransaction { ... }` without caring which one fired. + override protected def executeTransaction(func: => Unit): (Txn, Map[String, TxnTable]) = { + val (sessionTxn, tables) = super.executeTransaction(func) + val txn = Option(sessionTxn).orElse(Option(pathformatCat.lastTransaction)).orNull + (txn, tables) + } + private def createPathTable(name: String): Unit = { sql(s"CREATE TABLE $name (id INT, data STRING)") } @@ -83,6 +105,11 @@ class PathBasedTableTransactionSuite extends RowLevelOperationSuiteBase { } assert(txn.currentState === Committed) assert(txn.isClosed) + // The transaction must have gone through the SCO seam to pathformat_cat, not the + // (also-transactional) session catalog. Without these checks the assertions above + // would silently pass when the session catalog absorbs the write. + assert(pathformatCat.lastTransaction eq txn) + assert(catalog.lastTransaction == null) checkAnswer(spark.table(tablePathWithFormat), Row(1, "a") :: Row(2, "b") :: Nil) } @@ -96,6 +123,8 @@ class PathBasedTableTransactionSuite extends RowLevelOperationSuiteBase { } assert(txn.currentState === Committed) assert(txn.isClosed) + assert(pathformatCat.lastTransaction eq txn) + assert(catalog.lastTransaction == null) checkAnswer(spark.table(tablePathWithFormat), Row(1, "a") :: Nil) } @@ -142,13 +171,14 @@ class PathBasedTableTransactionSuite extends RowLevelOperationSuiteBase { query.processAllAvailable() query.stop() - val streamCat = streamSessionCatalog(query) + val streamCat = streamPathformatCat(query) val txn = streamCat.lastTransaction assert(txn != null, "expected a transaction to have been committed") assert(txn.currentState === Committed) assert(txn.isClosed) - // Streaming must not add transactions to the main session catalog. + // Streaming must not add transactions to the main session's catalogs. assert(catalog.observedTransactions.isEmpty) + assert(pathformatCat.observedTransactions.isEmpty) checkAnswer(spark.table(tablePathWithFormat), Row(1) :: Row(2) :: Row(3) :: Nil) } } @@ -171,7 +201,7 @@ class PathBasedTableTransactionSuite extends RowLevelOperationSuiteBase { query.processAllAvailable() query.stop() - val streamCat = streamSessionCatalog(query) + val streamCat = streamPathformatCat(query) val txn = streamCat.lastTransaction assert(txn != null, "expected a transaction to have been committed") assert(txn.currentState === Committed) @@ -180,9 +210,10 @@ class PathBasedTableTransactionSuite extends RowLevelOperationSuiteBase { assert(txn.catalog.txnTables.size === 1) val txnTable = txn.catalog.txnTables.values.head assert(txnTable.scanEvents.size === 1) - // Streaming must not add transactions to the main session catalog beyond the pre-existing - // INSERT transaction. - assert(catalog.observedTransactions.size === 1) + // Streaming must not add transactions to the main session's catalogs beyond the + // pre-existing INSERT transaction (which lives on pathformatCat, not the session). + assert(catalog.observedTransactions.isEmpty) + assert(pathformatCat.observedTransactions.size === 1) } } @@ -209,17 +240,19 @@ class PathBasedTableTransactionSuite extends RowLevelOperationSuiteBase { test("path-based write with same-catalog source succeeds") { createPathTable(tablePathWithFormat) - // ns1.source is resolved via the current catalog (spark_catalog), same as the write target. - sql("CREATE TABLE ns1.source (id INT, data STRING)") - sql("INSERT INTO ns1.source VALUES (1, 'a'), (2, 'b')") + // Source must live in pathformat_cat (same as the SCO-routed write target) for the + // source scan to be tracked in the same transaction. + val sourceName = s"${FakePathBasedSource.CATALOG_NAME}.ns1.source" + sql(s"CREATE TABLE $sourceName (id INT, data STRING)") + sql(s"INSERT INTO $sourceName VALUES (1, 'a'), (2, 'b')") val (txn, txnTables) = executeTransaction { - sql(s"INSERT INTO $tablePathWithFormat SELECT * FROM ns1.source WHERE id = 1") + sql(s"INSERT INTO $tablePathWithFormat SELECT * FROM $sourceName WHERE id = 1") } assert(txn.currentState === Committed) assert(txn.isClosed) // Source scan with predicate was tracked via the transaction catalog. - val sourceTxnTable = txnTables("spark_catalog.ns1.source") + val sourceTxnTable = txnTables(sourceName) assert(sourceTxnTable.scanEvents.size === 1) assert(sourceTxnTable.scanEvents.flatten.exists { case sources.EqualTo("id", 1) => true From bf43ee1ecb0f6cbc062e3bfebcfe0d7292034cd7 Mon Sep 17 00:00:00 2001 From: Andreas Chatzistergiou Date: Fri, 22 May 2026 14:00:16 +0000 Subject: [PATCH 18/18] Fix multipart resolution + test nits --- .../sql/execution/datasources/v2/DataSourceV2Utils.scala | 2 +- .../apache/spark/sql/connector/PathBasedTableSuite.scala | 9 +++------ .../sql/connector/PathBasedTableTransactionSuite.scala | 3 +-- 3 files changed, 5 insertions(+), 9 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Utils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Utils.scala index d2dfb1661e059..46af3ddd87b00 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Utils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Utils.scala @@ -142,7 +142,7 @@ private[sql] object DataSourceV2Utils extends Logging { DataSource.lookupDataSourceV2(nameParts.head, conf).flatMap { case sco: SupportsCatalogOptions => val optionsWithPath = getOptionsWithPaths( - CaseInsensitiveMap(Map.empty), nameParts.tail.mkString(".")) + CaseInsensitiveMap(Map.empty), nameParts.tail: _*) val dsOptions = buildDsOptions(sco, conf, optionsWithPath) Some(extractCatalogAndIdentifier(sco, dsOptions)) case _ => None diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/PathBasedTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/PathBasedTableSuite.scala index e1ac1bcf38cc6..5a1d796276d7c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/PathBasedTableSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/PathBasedTableSuite.scala @@ -48,9 +48,7 @@ class PathBasedTableSuite extends QueryTest with SharedSparkSession { } override def afterEach(): Unit = { - // SharedSparkSession reuses one SparkSession across tests. `reset()` drops registered - // non-session catalogs (including pathformat_cat), so the next test starts with a - // fresh InMemoryTableCatalog instance. + // Start each test starts with a fresh InMemoryTableCatalog instance. spark.sessionState.catalogManager.reset() spark.conf.unset(s"spark.sql.catalog.${FakePathBasedSource.CATALOG_NAME}") super.afterEach() @@ -113,7 +111,7 @@ class PathBasedTableSuite extends QueryTest with SharedSparkSession { } } - test("regression: v1 file format direct query still resolves") { + test("v1 file format direct query still resolves") { withTempDir { dir => val path = new java.io.File(dir, "p.parquet").getCanonicalPath Seq((1, "a"), (2, "b")).toDF("id", "data").write.parquet(path) @@ -145,8 +143,7 @@ class PathBasedTableSuite extends QueryTest with SharedSparkSession { test("catalog precedence: same-named catalog wins over SCO data source") { // Register a catalog under the same name as the SCO data source short name. SQL // resolution should route to the catalog; the SCO resolver is consulted only when - // no catalog claims the head, matching v1 file-format precedence (ResolveSQLOnFile) - // and Delta's ResolveDeltaPathTable extension. + // no catalog claims the head. withSQLConf("spark.sql.catalog.pathformat" -> classOf[InMemoryTableCatalog].getName) { sql(s"CREATE TABLE $tablePath (id INT, data STRING)") sql(s"INSERT INTO $tablePath VALUES (1, 'a')") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/PathBasedTableTransactionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/PathBasedTableTransactionSuite.scala index 98688a4b1b095..a552d264ef26d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/PathBasedTableTransactionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/PathBasedTableTransactionSuite.scala @@ -106,8 +106,7 @@ class PathBasedTableTransactionSuite extends RowLevelOperationSuiteBase { assert(txn.currentState === Committed) assert(txn.isClosed) // The transaction must have gone through the SCO seam to pathformat_cat, not the - // (also-transactional) session catalog. Without these checks the assertions above - // would silently pass when the session catalog absorbs the write. + // (also-transactional) session catalog. assert(pathformatCat.lastTransaction eq txn) assert(catalog.lastTransaction == null) checkAnswer(spark.table(tablePathWithFormat), Row(1, "a") :: Row(2, "b") :: Nil)