diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogManager.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogManager.scala index 3aad52dbd1d01..01f900e22537c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogManager.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogManager.scala @@ -32,6 +32,19 @@ import org.apache.spark.sql.connector.catalog.transactions.Transaction import org.apache.spark.sql.errors.QueryCompilationErrors import org.apache.spark.sql.internal.SQLConf +/** + * Resolves a multipart SQL identifier whose head names a path-based data source format + * (e.g. `pathformat.\`/path/to/t\``) to the owning catalog and a connector-canonicalized + * identifier, when the format implements [[SupportsCatalogOptions]]. + */ +private[sql] trait DataSourceCatalogResolver { + def resolve(nameParts: Seq[String]): Option[(String, Identifier)] +} + +private[sql] object DataSourceCatalogResolver { + val NoOp: DataSourceCatalogResolver = (_: Seq[String]) => None +} + /** * A thread-safe manager for [[CatalogPlugin]]s. It tracks all the registered catalogs, and allow * the caller to look up a catalog by name. @@ -46,7 +59,9 @@ import org.apache.spark.sql.internal.SQLConf private[sql] class CatalogManager( val defaultSessionCatalog: CatalogPlugin, - val v1SessionCatalog: SessionCatalog) extends SQLConfHelper with Logging { + val v1SessionCatalog: SessionCatalog, + val dataSourceCatalogResolver: DataSourceCatalogResolver = + DataSourceCatalogResolver.NoOp) extends SQLConfHelper with Logging { import CatalogManager.SESSION_CATALOG_NAME import CatalogV2Util._ @@ -74,6 +89,14 @@ class CatalogManager( def withTransaction(transaction: Transaction): CatalogManager = new TransactionAwareCatalogManager(this, transaction) + /** + * Returns the catalog name and connector-canonicalized identifier for a multipart SQL name + * whose head is a [[SupportsCatalogOptions]] data source format. Returns None if the format + * head is unknown or does not implement [[SupportsCatalogOptions]]. + */ + def catalogAndIdentForDataSource(nameParts: Seq[String]): Option[(String, Identifier)] = + dataSourceCatalogResolver.resolve(nameParts) + def isCatalogRegistered(name: String): Boolean = { try { catalog(name) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/LookupCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/LookupCatalog.scala index 14c0663730324..34bfbe4d39198 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/LookupCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/LookupCatalog.scala @@ -135,7 +135,18 @@ private[sql] trait LookupCatalog extends Logging { Some((catalog, ident)) } catch { case _: CatalogNotFoundException => - Some((currentCatalog, nameParts.asIdentifier)) + // No catalog matched. As a fallback, try path-based data sources: + // formats implementing SupportsCatalogOptions (e.g. `pathformat.`/path/to/t``) + // route to the catalog the connector designates. If no SCO format claims the + // identifier head, fall through to currentCatalog and let later analysis raise + // table-not-found. This matches the v1 file-format precedence (catalog first, + // path-based as fallback). + Option(catalogManager.catalogAndIdentForDataSource(nameParts)).flatten match { + case Some((catName, providerIdent)) => + Some((catalogManager.catalog(catName), providerIdent)) + case None => + Some((currentCatalog, nameParts.asIdentifier)) + } } } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/TransactionAwareCatalogManager.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/TransactionAwareCatalogManager.scala index 70079357b6dde..76ccd7480d97e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/TransactionAwareCatalogManager.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/TransactionAwareCatalogManager.scala @@ -32,7 +32,10 @@ import org.apache.spark.sql.connector.catalog.transactions.Transaction private[sql] class TransactionAwareCatalogManager( delegate: CatalogManager, txn: Transaction) - extends CatalogManager(delegate.defaultSessionCatalog, delegate.v1SessionCatalog) { + extends CatalogManager( + delegate.defaultSessionCatalog, + delegate.v1SessionCatalog, + delegate.dataSourceCatalogResolver) { override val tempVariableManager: TempVariableManager = delegate.tempVariableManager diff --git a/sql/core/src/main/scala/org/apache/spark/sql/classic/DataStreamWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/classic/DataStreamWriter.scala index 3e27605940383..2ec100ba0f59b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/classic/DataStreamWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/classic/DataStreamWriter.scala @@ -46,7 +46,6 @@ import org.apache.spark.sql.execution.streaming._ import org.apache.spark.sql.execution.streaming.sources._ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.streaming.{OutputMode, StreamingQuery, Trigger} -import org.apache.spark.sql.util.CaseInsensitiveStringMap import org.apache.spark.util.ArrayImplicits._ import org.apache.spark.util.Utils @@ -253,19 +252,12 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) extends streaming.D // file source v2 does not support streaming yet. classOf[FileDataSourceV2].isAssignableFrom(cls) - val optionsWithPath = if (path.isEmpty) { - extraOptions - } else { - extraOptions + ("path" -> path.get) - } + val optionsWithPath = DataSourceV2Utils.getOptionsWithPaths(extraOptions, path.toSeq: _*) val sink = if (classOf[TableProvider].isAssignableFrom(cls) && !useV1Source) { val provider = cls.getConstructor().newInstance().asInstanceOf[TableProvider] - val sessionOptions = DataSourceV2Utils.extractSessionConfigs( - source = provider, conf = ds.sparkSession.sessionState.conf) - val finalOptions = sessionOptions.filter { case (k, _) => !optionsWithPath.contains(k) } ++ - optionsWithPath.originalMap - val dsOptions = new CaseInsensitiveStringMap(finalOptions.asJava) + val dsOptions = DataSourceV2Utils.buildDsOptions( + provider, ds.sparkSession.sessionState.conf, optionsWithPath) // If the source accepts external table metadata, here we pass the schema of input query // to `getTable`. This is for avoiding schema inference, which can be very expensive. // If the query schema is not compatible with the existing data, the behavior is undefined. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala index 65bc57de907b2..3ca3032be485b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala @@ -22,7 +22,6 @@ import java.util.UUID import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong} import javax.annotation.concurrent.GuardedBy -import scala.jdk.CollectionConverters._ import scala.util.control.NonFatal import org.apache.hadoop.fs.Path @@ -32,22 +31,21 @@ import org.apache.spark.internal.LogKeys.EXTENDED_EXPLAIN_GENERATOR import org.apache.spark.rdd.RDD import org.apache.spark.sql.{AnalysisException, ExtendedExplainGenerator, Row} import org.apache.spark.sql.catalyst.{InternalRow, QueryPlanningTracker} -import org.apache.spark.sql.catalyst.analysis.{Analyzer, EliminateSubqueryAliases, LazyExpression, NameParameterizedQuery, UnresolvedRelation, UnsupportedOperationChecker} +import org.apache.spark.sql.catalyst.analysis.{Analyzer, LazyExpression, NameParameterizedQuery, UnsupportedOperationChecker} import org.apache.spark.sql.catalyst.expressions.codegen.ByteCodeStats import org.apache.spark.sql.catalyst.plans.QueryPlan -import org.apache.spark.sql.catalyst.plans.logical.{AppendData, Command, CommandResult, CompoundBody, CreateTableAsSelect, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic, ReplaceTableAsSelect, ReturnAnswer, TransactionalWrite => TransactionalWritePlan, Union, UnresolvedWith, WithCTE} +import org.apache.spark.sql.catalyst.plans.logical.{AppendData, Command, CommandResult, CompoundBody, CreateTableAsSelect, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic, ReplaceTableAsSelect, ReturnAnswer, Union, UnresolvedWith, WithCTE} import org.apache.spark.sql.catalyst.rules.{PlanChangeLogger, Rule} import org.apache.spark.sql.catalyst.transactions.TransactionUtils import org.apache.spark.sql.catalyst.util.StringUtils.PlanStringConcat import org.apache.spark.sql.catalyst.util.truncatedString import org.apache.spark.sql.classic.SparkSession -import org.apache.spark.sql.connector.catalog.{CatalogV2Util, LookupCatalog, SupportsCatalogOptions, TableCatalog, TransactionalCatalogPlugin} +import org.apache.spark.sql.connector.catalog.LookupCatalog import org.apache.spark.sql.connector.catalog.transactions.Transaction import org.apache.spark.sql.execution.SQLExecution.EXECUTION_ROOT_ID_KEY import org.apache.spark.sql.execution.adaptive.{AdaptiveExecutionContext, InsertAdaptiveSparkPlan} import org.apache.spark.sql.execution.bucketing.{CoalesceBucketsInJoin, DisableUnnecessaryBucketedScan} -import org.apache.spark.sql.execution.datasources.DataSource -import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Utils, TransactionalExec, V2TableRefreshUtil} +import org.apache.spark.sql.execution.datasources.v2.{TransactionalExec, V2TableRefreshUtil} import org.apache.spark.sql.execution.dynamicpruning.PlanDynamicPruningFilters import org.apache.spark.sql.execution.exchange.EnsureRequirements import org.apache.spark.sql.execution.reuse.ReuseExchangeAndSubquery @@ -56,7 +54,6 @@ import org.apache.spark.sql.execution.streaming.runtime.{IncrementalExecution, W import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.scripting.SqlScriptingExecution import org.apache.spark.sql.streaming.OutputMode -import org.apache.spark.sql.util.CaseInsensitiveStringMap import org.apache.spark.util.{LazyTry, Utils, UUIDv7Generator} import org.apache.spark.util.ArrayImplicits._ @@ -119,16 +116,9 @@ class QueryExecution( analyzerOpt.flatMap(_.catalogManager.transaction).orElse { // Only begin a new transaction for outer QEs that lead to execution. if (mode != CommandExecutionMode.SKIP) { - def resolve(w: TransactionalWritePlan): Option[TransactionalCatalogPlugin] = - pathBased(w) match { - case Some(c: TransactionalCatalogPlugin) => Some(c) - case Some(_) => None - // If the path is not data source based, fallback to catalog based resolution. - case None => TransactionalWrite.unapply(w) - } val catalog = logical match { - case UnresolvedWith(w: TransactionalWritePlan, _, _) => resolve(w) - case w: TransactionalWritePlan => resolve(w) + case UnresolvedWith(TransactionalWrite(c), _, _) => Some(c) + case TransactionalWrite(c) => Some(c) case _ => None } catalog.map(TransactionUtils.beginTransaction) @@ -139,34 +129,6 @@ class QueryExecution( } private def transactionOpt: Option[Transaction] = lazyTransactionOpt.get - // For path-based tables (e.g. `format.`/path/to/table``) the first identifier part is a - // connector name. SupportsCatalogOptions on the connector tells us which catalog actually - // owns the table. Returns Some(catalog) if parts.head is a recognized SupportsCatalogOptions - // data source (caller decides whether the catalog is transactional), or None to fall through - // to the catalog-based extractor. - private def pathBased(write: TransactionalWritePlan): Option[TableCatalog] = - EliminateSubqueryAliases(write.table) match { - case UnresolvedRelation(parts, _, _) if parts.length > 1 => - try { - DataSource.lookupDataSourceV2(parts.head, sparkSession.sessionState.conf) - .collect { case sco: SupportsCatalogOptions => sco } - .map { sco => - val sessionConfigs = DataSourceV2Utils.extractSessionConfigs( - sco, sparkSession.sessionState.conf) - // Pass the entire identifier as option. The connector can decide how to parse it - // if needed. - val options = sessionConfigs + ("identifier" -> parts.mkString(".")) - CatalogV2Util.getTableProviderCatalog( - sco, catalogManager, new CaseInsensitiveStringMap(options.asJava)) - } - } catch { - // The head of the multipart identifier is not a registered data source. - // Fallback to catalog-based detection. - case _: ClassNotFoundException => None - } - case _ => None - } - // Per-query analyzer: uses a transaction-aware CatalogManager when a transaction is active, // so that all catalog lookups and rule applications during analysis see the correct state // without relying on thread-local context. Any nested QueryExecution that is created during diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Utils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Utils.scala index a3b5c5aeb7995..46af3ddd87b00 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Utils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Utils.scala @@ -30,7 +30,7 @@ import org.apache.spark.sql.catalyst.analysis.TimeTravelSpec import org.apache.spark.sql.catalyst.expressions.Literal import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap -import org.apache.spark.sql.connector.catalog.{CatalogV2Util, SessionConfigSupport, StagedTable, StagingTableCatalog, SupportsCatalogOptions, SupportsRead, Table, TableProvider} +import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogV2Util, DataSourceCatalogResolver, Identifier, SessionConfigSupport, StagedTable, StagingTableCatalog, SupportsCatalogOptions, SupportsRead, Table, TableProvider} import org.apache.spark.sql.connector.catalog.TableCapability.BATCH_READ import org.apache.spark.sql.errors.QueryExecutionErrors import org.apache.spark.sql.execution.SQLExecution @@ -99,6 +99,60 @@ private[sql] object DataSourceV2Utils extends Logging { } } + /** + * Builds the [[CaseInsensitiveStringMap]] passed to a v2 [[TableProvider]]: session configs + * extracted from the provider, merged with the caller-supplied options-with-path map. + */ + def buildDsOptions( + provider: TableProvider, + conf: SQLConf, + optionsWithPath: CaseInsensitiveMap[String]): CaseInsensitiveStringMap = { + val sessionOptions = extractSessionConfigs(provider, conf) + val finalOptions = sessionOptions.filter { case (k, _) => !optionsWithPath.contains(k) } ++ + optionsWithPath.originalMap + new CaseInsensitiveStringMap(finalOptions.asJava) + } + + /** + * Extracts the catalog name and connector-canonical identifier from a + * [[SupportsCatalogOptions]] provider. Shared by all SCO entry points (DataFrame reader, SQL + * multipart-name resolution) so they observe identical null-handling semantics: + * `extractCatalog` returning null falls back to the session catalog, matching + * [[CatalogV2Util.getTableProviderCatalog]]. + */ + def extractCatalogAndIdentifier( + provider: SupportsCatalogOptions, + dsOptions: CaseInsensitiveStringMap): (String, Identifier) = { + val ident = provider.extractIdentifier(dsOptions) + val catalogName = Option(provider.extractCatalog(dsOptions)) + .getOrElse(CatalogManager.SESSION_CATALOG_NAME) + (catalogName, ident) + } + + /** + * Resolver bound to a session [[SQLConf]] that maps a multipart SQL identifier + * (e.g. `pathformat.\`/path/to/t\``) to a `(catalogName, identifier)` pair when the head + * names a registered [[SupportsCatalogOptions]] data source. Returns `None` for non-SCO + * sources or unknown format heads, letting the caller fall back to standard catalog + * resolution. + */ + def supportsCatalogOptionsResolver(conf: SQLConf): DataSourceCatalogResolver = + (nameParts: Seq[String]) => + try { + DataSource.lookupDataSourceV2(nameParts.head, conf).flatMap { + case sco: SupportsCatalogOptions => + val optionsWithPath = getOptionsWithPaths( + CaseInsensitiveMap(Map.empty), nameParts.tail: _*) + val dsOptions = buildDsOptions(sco, conf, optionsWithPath) + Some(extractCatalogAndIdentifier(sco, dsOptions)) + case _ => None + } + } catch { + // The format name is not a registered data source. Fall through and let the caller + // treat it as a catalog/namespace name. + case _: ClassNotFoundException => None + } + def loadV2Source( sparkSession: SparkSession, provider: TableProvider, @@ -108,23 +162,16 @@ private[sql] object DataSourceV2Utils extends Logging { paths: String*): Option[LogicalPlan] = { val catalogManager = sparkSession.sessionState.catalogManager val conf = sparkSession.sessionState.conf - val sessionOptions = DataSourceV2Utils.extractSessionConfigs(provider, conf) - val optionsWithPath = getOptionsWithPaths(extraOptions, paths: _*) - - val finalOptions = sessionOptions.filter { case (k, _) => !optionsWithPath.contains(k) } ++ - optionsWithPath.originalMap - val dsOptions = new CaseInsensitiveStringMap(finalOptions.asJava) + val dsOptions = buildDsOptions(provider, conf, optionsWithPath) val (table, catalog, ident, timeTravelSpec) = provider match { case _: SupportsCatalogOptions if userSpecifiedSchema.nonEmpty => throw new IllegalArgumentException( s"$source does not support user specified schema. Please don't specify the schema.") case hasCatalog: SupportsCatalogOptions => - val ident = hasCatalog.extractIdentifier(dsOptions) - val catalog = CatalogV2Util.getTableProviderCatalog( - hasCatalog, - catalogManager, - dsOptions) + import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ + val (catalogName, ident) = extractCatalogAndIdentifier(hasCatalog, dsOptions) + val catalog = catalogManager.catalog(catalogName).asTableCatalog val version = hasCatalog.extractTimeTravelVersion(dsOptions) val timestamp = hasCatalog.extractTimeTravelTimestamp(dsOptions) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala index c82651595bc59..78e0a1bd29587 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala @@ -29,7 +29,7 @@ import org.apache.spark.sql.catalyst.parser.ParserInterface import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.classic.{SparkSession, Strategy, StreamingCheckpointManager, StreamingQueryManager, UDFRegistration} -import org.apache.spark.sql.connector.catalog.CatalogManager +import org.apache.spark.sql.connector.catalog.{CatalogManager, DataSourceCatalogResolver} import org.apache.spark.sql.errors.QueryCompilationErrors import org.apache.spark.sql.execution.{ColumnarRule, CommandExecutionMode, QueryExecution, SparkOptimizer, SparkPlanner, SparkSqlParser} import org.apache.spark.sql.execution.adaptive.AdaptiveRulesHolder @@ -37,7 +37,7 @@ import org.apache.spark.sql.execution.aggregate.{ResolveEncodersInScalaAgg, Scal import org.apache.spark.sql.execution.analysis.DetectAmbiguousSelfJoin import org.apache.spark.sql.execution.command.{CheckViewReferences, CommandCheck} import org.apache.spark.sql.execution.datasources._ -import org.apache.spark.sql.execution.datasources.v2.{TableCapabilityCheck, V2SessionCatalog} +import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Utils, TableCapabilityCheck, V2SessionCatalog} import org.apache.spark.sql.execution.externalUDF.{ClassicExternalUDFPlanner, ExternalUDFPlanner, UnifiedExternalUDFPlanner} import org.apache.spark.sql.execution.streaming.runtime.ResolveWriteToStream @@ -162,8 +162,11 @@ abstract class BaseSessionStateBuilder( protected lazy val v2SessionCatalog = new V2SessionCatalog(catalog) + protected lazy val dataSourceCatalogResolver: DataSourceCatalogResolver = + DataSourceV2Utils.supportsCatalogOptionsResolver(conf) + protected lazy val catalogManager = { - val cm = new CatalogManager(v2SessionCatalog, catalog) + val cm = new CatalogManager(v2SessionCatalog, catalog, dataSourceCatalogResolver) parentState.foreach(ps => cm.copySessionPathFrom(ps.catalogManager)) cm } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/FakeV2Provider.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/FakeV2Provider.scala index d9faa7d7473dc..031dc0ada21e9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/FakeV2Provider.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/FakeV2Provider.scala @@ -18,10 +18,12 @@ package org.apache.spark.sql.connector import java.util +import java.util.Optional -import org.apache.spark.sql.connector.catalog.{SupportsV1OverwriteWithSaveAsTable, Table, TableProvider} +import org.apache.spark.sql.connector.catalog.{Identifier, SessionConfigSupport, SupportsCatalogOptions, SupportsV1OverwriteWithSaveAsTable, Table, TableProvider} import org.apache.spark.sql.connector.expressions.Transform import org.apache.spark.sql.connector.read.{InputPartition, ScanBuilder} +import org.apache.spark.sql.sources.DataSourceRegister import org.apache.spark.sql.types.StructType import org.apache.spark.sql.util.CaseInsensitiveStringMap @@ -93,3 +95,52 @@ class FakeV2ProviderWithV1SaveAsTableOverwriteWriteOptionDisabled with SupportsV1OverwriteWithSaveAsTable { override def addV1OverwriteWithSaveAsTableOption(): Boolean = false } + +/** + * Simulates a path-based connector that implements [[SupportsCatalogOptions]] and routes + * `pathformat.\`/path/to/t\`` SQL identifiers to a dedicated catalog (`pathformat_cat`). + * Tests register that catalog and assert against it so the SCO seam is exercised + * unambiguously: without SCO, `CatalogAndIdentifier` falls back to the current catalog + * (session catalog) and the target catalog stays empty. + */ +class FakePathBasedSource + extends FakeV2ProviderWithCustomSchema + with SupportsCatalogOptions + with DataSourceRegister { + + override def shortName(): String = "pathformat" + + override def extractCatalog(options: CaseInsensitiveStringMap): String = + FakePathBasedSource.CATALOG_NAME + + override def extractIdentifier(options: CaseInsensitiveStringMap): Identifier = + Identifier.of(Array(shortName()), options.get("path")) + + override def extractTimeTravelVersion(options: CaseInsensitiveStringMap): Optional[String] = + Optional.ofNullable(options.get("versionAsOf")) +} + +object FakePathBasedSource { + val CATALOG_NAME: String = "pathformat_cat" +} + +/** + * Like [[FakePathBasedSource]] but resolves the owning catalog from the session config + * `spark.datasource.pathformat2.catalog` instead of always returning null. This simulates + * a connector that lets users configure the target catalog. + */ +class FakePathBasedSourceWithSessionConfig + extends FakeV2ProviderWithCustomSchema + with SupportsCatalogOptions + with SessionConfigSupport + with DataSourceRegister { + + override def shortName(): String = "pathformat2" + + override def keyPrefix: String = "pathformat2" + + override def extractCatalog(options: CaseInsensitiveStringMap): String = options.get("catalog") + + override def extractIdentifier(options: CaseInsensitiveStringMap): Identifier = + Identifier.of(Array(shortName()), options.get("path")) +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/PathBasedTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/PathBasedTableSuite.scala new file mode 100644 index 0000000000000..5a1d796276d7c --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/PathBasedTableSuite.scala @@ -0,0 +1,237 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connector + +import org.apache.spark.sql.{AnalysisException, QueryTest, Row} +import org.apache.spark.sql.connector.catalog.InMemoryTableCatalog +import org.apache.spark.sql.test.SharedSparkSession + +/** + * Non-transactional tests for SQL resolution of path-based tables surfaced by a + * [[org.apache.spark.sql.connector.catalog.SupportsCatalogOptions]] data source + * (e.g. `pathformat.`/path/to/t``). [[FakePathBasedSource]] routes resolution to a + * dedicated `pathformat_cat` catalog rather than the session catalog, so assertions + * against that catalog unambiguously confirm the SCO seam fired — without SCO, + * `CatalogAndIdentifier`'s fallback lands in the (default) session catalog and the + * named catalog stays empty. + */ +class PathBasedTableSuite extends QueryTest with SharedSparkSession { + + import testImplicits._ + + private val tablePath = "pathformat.`/path/to/t`" + + private def pathformatCat: InMemoryTableCatalog = + spark.sessionState.catalogManager.catalog(FakePathBasedSource.CATALOG_NAME) + .asInstanceOf[InMemoryTableCatalog] + + override def beforeEach(): Unit = { + super.beforeEach() + spark.conf.set( + s"spark.sql.catalog.${FakePathBasedSource.CATALOG_NAME}", + classOf[InMemoryTableCatalog].getName) + } + + override def afterEach(): Unit = { + // Start each test starts with a fresh InMemoryTableCatalog instance. + spark.sessionState.catalogManager.reset() + spark.conf.unset(s"spark.sql.catalog.${FakePathBasedSource.CATALOG_NAME}") + super.afterEach() + } + + test("CREATE then SELECT on path-based table") { + sql(s"CREATE TABLE $tablePath (id INT, data STRING)") + sql(s"INSERT INTO $tablePath VALUES (1, 'a'), (2, 'b')") + checkAnswer(spark.table(tablePath), Row(1, "a") :: Row(2, "b") :: Nil) + // The SCO resolver routed creation/reads to pathformat_cat. Without the seam, + // CREATE would fall through to the (default) session catalog and fail. + assert(pathformatCat.listTables(Array("pathformat")).map(_.name()).contains("/path/to/t")) + } + + test("DESCRIBE TABLE resolves path-based table") { + sql(s"CREATE TABLE $tablePath (id INT, data STRING)") + checkAnswer( + sql(s"DESCRIBE TABLE $tablePath").select("col_name", "data_type"), + Row("id", "int") :: Row("data", "string") :: Nil) + } + + test("ALTER TABLE on path-based table") { + sql(s"CREATE TABLE $tablePath (id INT, data STRING)") + sql(s"ALTER TABLE $tablePath ADD COLUMNS (extra DOUBLE)") + val columns = sql(s"DESCRIBE TABLE $tablePath").collect().map(_.getString(0)).toSet + assert(Set("id", "data", "extra").subsetOf(columns)) + } + + test("DROP TABLE on path-based table") { + sql(s"CREATE TABLE $tablePath (id INT, data STRING)") + sql(s"DROP TABLE $tablePath") + intercept[AnalysisException] { + sql(s"SELECT * FROM $tablePath") + } + } + + test("JOIN across two path-based tables") { + val a = "pathformat.`/a`" + val b = "pathformat.`/b`" + sql(s"CREATE TABLE $a (id INT, x STRING)") + sql(s"CREATE TABLE $b (id INT, y STRING)") + sql(s"INSERT INTO $a VALUES (1, 'x1'), (2, 'x2')") + sql(s"INSERT INTO $b VALUES (1, 'y1'), (2, 'y2')") + checkAnswer( + sql(s"SELECT a.id, a.x, b.y FROM $a a JOIN $b b ON a.id = b.id ORDER BY a.id"), + Row(1, "x1", "y1") :: Row(2, "x2", "y2") :: Nil) + } + + test("session-config catalog routes non-transactional reads") { + val target = "tgt" + withSQLConf( + s"spark.sql.catalog.$target" -> classOf[InMemoryTableCatalog].getName, + s"spark.datasource.pathformat2.catalog" -> target) { + sql("CREATE TABLE pathformat2.`/p` (id INT, data STRING)") + sql("INSERT INTO pathformat2.`/p` VALUES (1, 'a')") + checkAnswer(spark.table("pathformat2.`/p`"), Row(1, "a") :: Nil) + val tgt = spark.sessionState.catalogManager.catalog(target) + .asInstanceOf[InMemoryTableCatalog] + assert(tgt.listTables(Array("pathformat2")).map(_.name()).contains("/p")) + } + } + + test("v1 file format direct query still resolves") { + withTempDir { dir => + val path = new java.io.File(dir, "p.parquet").getCanonicalPath + Seq((1, "a"), (2, "b")).toDF("id", "data").write.parquet(path) + checkAnswer(sql(s"SELECT * FROM parquet.`$path`"), Row(1, "a") :: Row(2, "b") :: Nil) + } + } + + test("unknown format head produces table-not-found, not ClassNotFoundException") { + val e = intercept[AnalysisException] { + sql("SELECT * FROM unknown_fmt.`/path/to/t`") + } + // The format head is not a registered data source and not a catalog. Resolution + // falls through to a normal table-not-found error. + assert(e.getCondition == "TABLE_OR_VIEW_NOT_FOUND") + } + + test("VERSION AS OF on path-based table") { + val base = "pathformat.`/p`" + // InMemoryTableCatalog implements time travel by appending the version string to the + // identifier name (see loadTable(ident, version) - it looks up name + version). + val versioned = "pathformat.`/pv1`" + sql(s"CREATE TABLE $base (id INT)") + sql(s"CREATE TABLE $versioned (id INT)") + sql(s"INSERT INTO $base VALUES (1)") + sql(s"INSERT INTO $versioned VALUES (2)") + checkAnswer(sql(s"SELECT * FROM $base VERSION AS OF 'v1'"), Row(2)) + } + + test("catalog precedence: same-named catalog wins over SCO data source") { + // Register a catalog under the same name as the SCO data source short name. SQL + // resolution should route to the catalog; the SCO resolver is consulted only when + // no catalog claims the head. + withSQLConf("spark.sql.catalog.pathformat" -> classOf[InMemoryTableCatalog].getName) { + sql(s"CREATE TABLE $tablePath (id INT, data STRING)") + sql(s"INSERT INTO $tablePath VALUES (1, 'a')") + checkAnswer(spark.table(tablePath), Row(1, "a") :: Nil) + + // Table lives in the homonym catalog at identifier (ns=[], name="/path/to/t"), + // and the SCO-targeted catalog (pathformat_cat) is untouched because the SCO + // resolver was never consulted. + val homonymCat = spark.sessionState.catalogManager.catalog("pathformat") + .asInstanceOf[InMemoryTableCatalog] + assert(homonymCat.listTables(Array.empty).map(_.name()).contains("/path/to/t")) + assert(!pathformatCat.namespaceExists(Array("pathformat"))) + } + } + + test("CREATE TABLE AS SELECT on path-based table") { + withTable("source") { + sql("CREATE TABLE source (id INT, data STRING)") + sql("INSERT INTO source VALUES (1, 'a'), (2, 'b')") + sql(s"CREATE TABLE $tablePath AS SELECT * FROM source") + checkAnswer(spark.table(tablePath), Row(1, "a") :: Row(2, "b") :: Nil) + } + } + + test("REPLACE TABLE AS SELECT on path-based table") { + withTable("source") { + sql("CREATE TABLE source (id INT, data STRING)") + sql("INSERT INTO source VALUES (1, 'a'), (2, 'b'), (3, 'c')") + sql(s"CREATE TABLE $tablePath AS SELECT * FROM source") + sql(s"REPLACE TABLE $tablePath AS SELECT id FROM source WHERE id > 1") + checkAnswer(spark.table(tablePath), Row(2) :: Row(3) :: Nil) + } + } + + test("INSERT OVERWRITE on path-based table") { + sql(s"CREATE TABLE $tablePath (id INT, data STRING)") + sql(s"INSERT INTO $tablePath VALUES (1, 'a'), (2, 'b')") + sql(s"INSERT OVERWRITE $tablePath VALUES (9, 'z')") + checkAnswer(spark.table(tablePath), Row(9, "z") :: Nil) + } + + test("DataFrame API: read resolves via SCO") { + // Create via SQL (exercises the new LookupCatalog SCO seam), read via DataFrame + // (exercises the pre-existing DataFrameReader SCO path in DataSourceV2Utils). + // Both paths should land on the same Identifier in pathformat_cat. + sql(s"CREATE TABLE $tablePath (id INT, data STRING)") + sql(s"INSERT INTO $tablePath VALUES (1, 'a'), (2, 'b')") + val df = spark.read.format("pathformat").load("/path/to/t") + checkAnswer(df, Row(1, "a") :: Row(2, "b") :: Nil) + } + + test("DataFrame API: write via SCO, read via SQL") { + // Write through DataFrameWriter (exercises the refactored buildDsOptions in the v2 + // write path), read back via SQL to confirm both entry points land on the same + // Identifier under pathformat_cat. + Seq((1, "a"), (2, "b")).toDF("id", "data") + .write.format("pathformat").save("/path/to/t") + checkAnswer(spark.table(tablePath), Row(1, "a") :: Row(2, "b") :: Nil) + } + + test("DataFrame API: time travel via SCO") { + // InMemoryTableCatalog implements time travel by appending the version string to + // the identifier name. SCO time-travel options flow through the refactored + // extractCatalogAndIdentifier helper, so this also regresses that path. + sql("CREATE TABLE pathformat.`/p` (id INT)") + sql("CREATE TABLE pathformat.`/pv1` (id INT)") + sql("INSERT INTO pathformat.`/p` VALUES (1)") + sql("INSERT INTO pathformat.`/pv1` VALUES (2)") + val df = spark.read.format("pathformat").option("versionAsOf", "v1").load("/p") + checkAnswer(df, Row(2)) + } + + test("DataFrame API: pure write and read via SCO (no SQL)") { + // Uses only DataFrameWriter/DataFrameReader, so it exercises the v2 SCO entry point + // in DataSourceV2Utils.loadV2Source / the writer's getTableProviderCatalog branch + // independently of LookupCatalog. Survives even when the SQL SCO seam is removed. + Seq((1, "a"), (2, "b")).toDF("id", "data") + .write.format("pathformat").save("/path/to/t") + val df = spark.read.format("pathformat").load("/path/to/t") + checkAnswer(df, Row(1, "a") :: Row(2, "b") :: Nil) + assert(pathformatCat.listTables(Array("pathformat")).map(_.name()).contains("/path/to/t")) + } + + test("DataFrame API: pure time travel via SCO (no SQL)") { + // Pure DataFrame setup so the test does not depend on the SQL SCO seam. + Seq(1).toDF("id").write.format("pathformat").save("/p") + Seq(2).toDF("id").write.format("pathformat").save("/pv1") + val df = spark.read.format("pathformat").option("versionAsOf", "v1").load("/p") + checkAnswer(df, Row(2)) + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/PathBasedTableTransactionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/PathBasedTableTransactionSuite.scala index c81f53673af3a..a552d264ef26d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/PathBasedTableTransactionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/PathBasedTableTransactionSuite.scala @@ -19,18 +19,18 @@ package org.apache.spark.sql.connector import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.Row -import org.apache.spark.sql.connector.catalog.{Aborted, Committed, Identifier, InMemoryRowLevelOperationTableCatalog, InMemoryTableCatalog, SessionConfigSupport, SharedTablesInMemoryRowLevelOperationTableCatalog, SupportsCatalogOptions} +import org.apache.spark.sql.connector.catalog.{Aborted, Committed, InMemoryRowLevelOperationTableCatalog, InMemoryTableCatalog, SharedTablesInMemoryRowLevelOperationTableCatalog, Txn, TxnTable} import org.apache.spark.sql.execution.streaming.runtime.{MemoryStream, StreamingQueryWrapper} import org.apache.spark.sql.internal.SQLConf.V2_SESSION_CATALOG_IMPLEMENTATION -import org.apache.spark.sql.sources.DataSourceRegister +import org.apache.spark.sql.sources import org.apache.spark.sql.streaming.StreamingQuery -import org.apache.spark.sql.util.CaseInsensitiveStringMap /** * Tests for transactional writes to path-based tables, where the table is identified by a * bare path with no catalog prefix (e.g. `/path/to/t`), or a connector-prefixed path - * (e.g. `pathformat.`/path/to/t``). The transactional catalog is registered as the session - * catalog (`spark_catalog`). + * (e.g. `pathformat.`/path/to/t``). Bare paths resolve to the session catalog + * (`spark_catalog`, set to a transactional impl); connector-prefixed paths route through + * the SCO seam to a dedicated transactional catalog (`pathformat_cat`). */ class PathBasedTableTransactionSuite extends RowLevelOperationSuiteBase { @@ -44,11 +44,15 @@ class PathBasedTableTransactionSuite extends RowLevelOperationSuiteBase { spark.conf.set( V2_SESSION_CATALOG_IMPLEMENTATION.key, classOf[SharedTablesInMemoryRowLevelOperationTableCatalog].getName) + spark.conf.set( + s"spark.sql.catalog.${FakePathBasedSource.CATALOG_NAME}", + classOf[SharedTablesInMemoryRowLevelOperationTableCatalog].getName) } override def afterEach(): Unit = { SharedTablesInMemoryRowLevelOperationTableCatalog.reset() spark.conf.unset(V2_SESSION_CATALOG_IMPLEMENTATION.key) + spark.conf.unset(s"spark.sql.catalog.${FakePathBasedSource.CATALOG_NAME}") super.afterEach() } @@ -57,12 +61,29 @@ class PathBasedTableTransactionSuite extends RowLevelOperationSuiteBase { .asInstanceOf[InMemoryRowLevelOperationTableCatalog] } - private def streamSessionCatalog(query: StreamingQuery): InMemoryRowLevelOperationTableCatalog = { + // The SCO-targeted catalog that connector-prefixed paths route to. Distinct instance + // from `catalog` (session); both share table state via the SharedTables static map. + protected def pathformatCat: InMemoryRowLevelOperationTableCatalog = { + spark.sessionState.catalogManager.catalog(FakePathBasedSource.CATALOG_NAME) + .asInstanceOf[InMemoryRowLevelOperationTableCatalog] + } + + private def streamPathformatCat( + query: StreamingQuery): InMemoryRowLevelOperationTableCatalog = { val session = query.asInstanceOf[StreamingQueryWrapper].streamingQuery.sparkSessionForStream - session.sessionState.catalogManager.v2SessionCatalog + session.sessionState.catalogManager.catalog(FakePathBasedSource.CATALOG_NAME) .asInstanceOf[InMemoryRowLevelOperationTableCatalog] } + // Bare-path tests resolve to the session catalog; connector-prefixed tests resolve via + // SCO to `pathformat_cat`. Return whichever catalog actually ran a transaction so test + // bodies can keep using `executeTransaction { ... }` without caring which one fired. + override protected def executeTransaction(func: => Unit): (Txn, Map[String, TxnTable]) = { + val (sessionTxn, tables) = super.executeTransaction(func) + val txn = Option(sessionTxn).orElse(Option(pathformatCat.lastTransaction)).orNull + (txn, tables) + } + private def createPathTable(name: String): Unit = { sql(s"CREATE TABLE $name (id INT, data STRING)") } @@ -84,6 +105,10 @@ class PathBasedTableTransactionSuite extends RowLevelOperationSuiteBase { } assert(txn.currentState === Committed) assert(txn.isClosed) + // The transaction must have gone through the SCO seam to pathformat_cat, not the + // (also-transactional) session catalog. + assert(pathformatCat.lastTransaction eq txn) + assert(catalog.lastTransaction == null) checkAnswer(spark.table(tablePathWithFormat), Row(1, "a") :: Row(2, "b") :: Nil) } @@ -97,6 +122,8 @@ class PathBasedTableTransactionSuite extends RowLevelOperationSuiteBase { } assert(txn.currentState === Committed) assert(txn.isClosed) + assert(pathformatCat.lastTransaction eq txn) + assert(catalog.lastTransaction == null) checkAnswer(spark.table(tablePathWithFormat), Row(1, "a") :: Nil) } @@ -116,6 +143,8 @@ class PathBasedTableTransactionSuite extends RowLevelOperationSuiteBase { assert(txnCat.lastTransaction == null) } + txnCat.lastTransaction = null // Reset to distinguish from block 1. + // Transactional catalog configured: pathBased resolves txncat as a // TransactionalCatalogPlugin and opens the transaction there instead. withSQLConf("spark.datasource.pathformat2.catalog" -> "txncat") { @@ -141,13 +170,14 @@ class PathBasedTableTransactionSuite extends RowLevelOperationSuiteBase { query.processAllAvailable() query.stop() - val streamCat = streamSessionCatalog(query) + val streamCat = streamPathformatCat(query) val txn = streamCat.lastTransaction assert(txn != null, "expected a transaction to have been committed") assert(txn.currentState === Committed) assert(txn.isClosed) - // Streaming must not add transactions to the main session catalog. + // Streaming must not add transactions to the main session's catalogs. assert(catalog.observedTransactions.isEmpty) + assert(pathformatCat.observedTransactions.isEmpty) checkAnswer(spark.table(tablePathWithFormat), Row(1) :: Row(2) :: Row(3) :: Nil) } } @@ -170,7 +200,7 @@ class PathBasedTableTransactionSuite extends RowLevelOperationSuiteBase { query.processAllAvailable() query.stop() - val streamCat = streamSessionCatalog(query) + val streamCat = streamPathformatCat(query) val txn = streamCat.lastTransaction assert(txn != null, "expected a transaction to have been committed") assert(txn.currentState === Committed) @@ -179,9 +209,10 @@ class PathBasedTableTransactionSuite extends RowLevelOperationSuiteBase { assert(txn.catalog.txnTables.size === 1) val txnTable = txn.catalog.txnTables.values.head assert(txnTable.scanEvents.size === 1) - // Streaming must not add transactions to the main session catalog beyond the pre-existing - // INSERT transaction. - assert(catalog.observedTransactions.size === 1) + // Streaming must not add transactions to the main session's catalogs beyond the + // pre-existing INSERT transaction (which lives on pathformatCat, not the session). + assert(catalog.observedTransactions.isEmpty) + assert(pathformatCat.observedTransactions.size === 1) } } @@ -205,45 +236,28 @@ class PathBasedTableTransactionSuite extends RowLevelOperationSuiteBase { assert(txn.currentState === Aborted) assert(txn.isClosed) } -} - -/** - * Simulates a path-based connector (e.g. Delta) that implements [[SupportsCatalogOptions]] - * to route `pathformat.\`/path/to/t\`` SQL identifiers to the session catalog. Returning - * null from [[extractCatalog]] signals that the session catalog (`spark_catalog`) owns the - * table, matching Delta's behavior where DeltaCatalog is registered as spark_catalog. - */ -class FakePathBasedSource - extends FakeV2ProviderWithCustomSchema - with SupportsCatalogOptions - with DataSourceRegister { - - override def shortName(): String = "pathformat" - // Use the session catalog. - override def extractCatalog(options: CaseInsensitiveStringMap): String = null - - // Not used in the transactional path. - override def extractIdentifier(options: CaseInsensitiveStringMap): Identifier = null -} - -/** - * Like [[FakePathBasedSource]] but resolves the owning catalog from the session config - * `spark.datasource.pathformat2.catalog` instead of always returning null. This simulates - * a connector that lets users configure the target catalog. - */ -class FakePathBasedSourceWithSessionConfig - extends FakeV2ProviderWithCustomSchema - with SupportsCatalogOptions - with SessionConfigSupport - with DataSourceRegister { - - override def shortName(): String = "pathformat2" - - override def keyPrefix: String = "pathformat2" - - override def extractCatalog(options: CaseInsensitiveStringMap): String = options.get("catalog") + test("path-based write with same-catalog source succeeds") { + createPathTable(tablePathWithFormat) + // Source must live in pathformat_cat (same as the SCO-routed write target) for the + // source scan to be tracked in the same transaction. + val sourceName = s"${FakePathBasedSource.CATALOG_NAME}.ns1.source" + sql(s"CREATE TABLE $sourceName (id INT, data STRING)") + sql(s"INSERT INTO $sourceName VALUES (1, 'a'), (2, 'b')") + + val (txn, txnTables) = executeTransaction { + sql(s"INSERT INTO $tablePathWithFormat SELECT * FROM $sourceName WHERE id = 1") + } + assert(txn.currentState === Committed) + assert(txn.isClosed) + // Source scan with predicate was tracked via the transaction catalog. + val sourceTxnTable = txnTables(sourceName) + assert(sourceTxnTable.scanEvents.size === 1) + assert(sourceTxnTable.scanEvents.flatten.exists { + case sources.EqualTo("id", 1) => true + case _ => false + }) + checkAnswer(spark.table(tablePathWithFormat), Row(1, "a") :: Nil) + } - // Not used in the transactional path. - override def extractIdentifier(options: CaseInsensitiveStringMap): Identifier = null }