Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,19 @@ import org.apache.spark.sql.connector.catalog.transactions.Transaction
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.internal.SQLConf

/**
* Resolves a multipart SQL identifier whose head names a path-based data source format
* (e.g. `pathformat.\`/path/to/t\``) to the owning catalog and a connector-canonicalized
* identifier, when the format implements [[SupportsCatalogOptions]].
*/
private[sql] trait DataSourceCatalogResolver {
def resolve(nameParts: Seq[String]): Option[(String, Identifier)]
}

private[sql] object DataSourceCatalogResolver {
val NoOp: DataSourceCatalogResolver = (_: Seq[String]) => None
}

/**
* A thread-safe manager for [[CatalogPlugin]]s. It tracks all the registered catalogs, and allow
* the caller to look up a catalog by name.
Expand All @@ -46,7 +59,9 @@ import org.apache.spark.sql.internal.SQLConf
private[sql]
class CatalogManager(
val defaultSessionCatalog: CatalogPlugin,
val v1SessionCatalog: SessionCatalog) extends SQLConfHelper with Logging {
val v1SessionCatalog: SessionCatalog,
val dataSourceCatalogResolver: DataSourceCatalogResolver =
DataSourceCatalogResolver.NoOp) extends SQLConfHelper with Logging {
import CatalogManager.SESSION_CATALOG_NAME
import CatalogV2Util._

Expand Down Expand Up @@ -74,6 +89,14 @@ class CatalogManager(
def withTransaction(transaction: Transaction): CatalogManager =
new TransactionAwareCatalogManager(this, transaction)

/**
* Returns the catalog name and connector-canonicalized identifier for a multipart SQL name
* whose head is a [[SupportsCatalogOptions]] data source format. Returns None if the format
* head is unknown or does not implement [[SupportsCatalogOptions]].
*/
def catalogAndIdentForDataSource(nameParts: Seq[String]): Option[(String, Identifier)] =
dataSourceCatalogResolver.resolve(nameParts)

def isCatalogRegistered(name: String): Boolean = {
try {
catalog(name)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,18 @@ private[sql] trait LookupCatalog extends Logging {
Some((catalog, ident))
} catch {
case _: CatalogNotFoundException =>
Some((currentCatalog, nameParts.asIdentifier))
// No catalog matched. As a fallback, try path-based data sources:
// formats implementing SupportsCatalogOptions (e.g. `pathformat.`/path/to/t``)
// route to the catalog the connector designates. If no SCO format claims the
// identifier head, fall through to currentCatalog and let later analysis raise
// table-not-found. This matches the v1 file-format precedence (catalog first,
// path-based as fallback).
Option(catalogManager.catalogAndIdentForDataSource(nameParts)).flatten match {
case Some((catName, providerIdent)) =>
Some((catalogManager.catalog(catName), providerIdent))
case None =>
Some((currentCatalog, nameParts.asIdentifier))
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,10 @@ import org.apache.spark.sql.connector.catalog.transactions.Transaction
private[sql] class TransactionAwareCatalogManager(
delegate: CatalogManager,
txn: Transaction)
extends CatalogManager(delegate.defaultSessionCatalog, delegate.v1SessionCatalog) {
extends CatalogManager(
delegate.defaultSessionCatalog,
delegate.v1SessionCatalog,
delegate.dataSourceCatalogResolver) {

override val tempVariableManager: TempVariableManager = delegate.tempVariableManager

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.execution.streaming.sources._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.streaming.{OutputMode, StreamingQuery, Trigger}
import org.apache.spark.sql.util.CaseInsensitiveStringMap
import org.apache.spark.util.ArrayImplicits._
import org.apache.spark.util.Utils

Expand Down Expand Up @@ -253,19 +252,12 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) extends streaming.D
// file source v2 does not support streaming yet.
classOf[FileDataSourceV2].isAssignableFrom(cls)

val optionsWithPath = if (path.isEmpty) {
extraOptions
} else {
extraOptions + ("path" -> path.get)
}
val optionsWithPath = DataSourceV2Utils.getOptionsWithPaths(extraOptions, path.toSeq: _*)

val sink = if (classOf[TableProvider].isAssignableFrom(cls) && !useV1Source) {
val provider = cls.getConstructor().newInstance().asInstanceOf[TableProvider]
val sessionOptions = DataSourceV2Utils.extractSessionConfigs(
source = provider, conf = ds.sparkSession.sessionState.conf)
val finalOptions = sessionOptions.filter { case (k, _) => !optionsWithPath.contains(k) } ++
optionsWithPath.originalMap
val dsOptions = new CaseInsensitiveStringMap(finalOptions.asJava)
val dsOptions = DataSourceV2Utils.buildDsOptions(
provider, ds.sparkSession.sessionState.conf, optionsWithPath)
// If the source accepts external table metadata, here we pass the schema of input query
// to `getTable`. This is for avoiding schema inference, which can be very expensive.
// If the query schema is not compatible with the existing data, the behavior is undefined.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import java.util.UUID
import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong}
import javax.annotation.concurrent.GuardedBy

import scala.jdk.CollectionConverters._
import scala.util.control.NonFatal

import org.apache.hadoop.fs.Path
Expand All @@ -32,22 +31,21 @@ import org.apache.spark.internal.LogKeys.EXTENDED_EXPLAIN_GENERATOR
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{AnalysisException, ExtendedExplainGenerator, Row}
import org.apache.spark.sql.catalyst.{InternalRow, QueryPlanningTracker}
import org.apache.spark.sql.catalyst.analysis.{Analyzer, EliminateSubqueryAliases, LazyExpression, NameParameterizedQuery, UnresolvedRelation, UnsupportedOperationChecker}
import org.apache.spark.sql.catalyst.analysis.{Analyzer, LazyExpression, NameParameterizedQuery, UnsupportedOperationChecker}
import org.apache.spark.sql.catalyst.expressions.codegen.ByteCodeStats
import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.catalyst.plans.logical.{AppendData, Command, CommandResult, CompoundBody, CreateTableAsSelect, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic, ReplaceTableAsSelect, ReturnAnswer, TransactionalWrite => TransactionalWritePlan, Union, UnresolvedWith, WithCTE}
import org.apache.spark.sql.catalyst.plans.logical.{AppendData, Command, CommandResult, CompoundBody, CreateTableAsSelect, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic, ReplaceTableAsSelect, ReturnAnswer, Union, UnresolvedWith, WithCTE}
import org.apache.spark.sql.catalyst.rules.{PlanChangeLogger, Rule}
import org.apache.spark.sql.catalyst.transactions.TransactionUtils
import org.apache.spark.sql.catalyst.util.StringUtils.PlanStringConcat
import org.apache.spark.sql.catalyst.util.truncatedString
import org.apache.spark.sql.classic.SparkSession
import org.apache.spark.sql.connector.catalog.{CatalogV2Util, LookupCatalog, SupportsCatalogOptions, TableCatalog, TransactionalCatalogPlugin}
import org.apache.spark.sql.connector.catalog.LookupCatalog
import org.apache.spark.sql.connector.catalog.transactions.Transaction
import org.apache.spark.sql.execution.SQLExecution.EXECUTION_ROOT_ID_KEY
import org.apache.spark.sql.execution.adaptive.{AdaptiveExecutionContext, InsertAdaptiveSparkPlan}
import org.apache.spark.sql.execution.bucketing.{CoalesceBucketsInJoin, DisableUnnecessaryBucketedScan}
import org.apache.spark.sql.execution.datasources.DataSource
import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Utils, TransactionalExec, V2TableRefreshUtil}
import org.apache.spark.sql.execution.datasources.v2.{TransactionalExec, V2TableRefreshUtil}
import org.apache.spark.sql.execution.dynamicpruning.PlanDynamicPruningFilters
import org.apache.spark.sql.execution.exchange.EnsureRequirements
import org.apache.spark.sql.execution.reuse.ReuseExchangeAndSubquery
Expand All @@ -56,7 +54,6 @@ import org.apache.spark.sql.execution.streaming.runtime.{IncrementalExecution, W
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.scripting.SqlScriptingExecution
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.util.CaseInsensitiveStringMap
import org.apache.spark.util.{LazyTry, Utils, UUIDv7Generator}
import org.apache.spark.util.ArrayImplicits._

Expand Down Expand Up @@ -119,16 +116,9 @@ class QueryExecution(
analyzerOpt.flatMap(_.catalogManager.transaction).orElse {
// Only begin a new transaction for outer QEs that lead to execution.
if (mode != CommandExecutionMode.SKIP) {
def resolve(w: TransactionalWritePlan): Option[TransactionalCatalogPlugin] =
pathBased(w) match {
case Some(c: TransactionalCatalogPlugin) => Some(c)
case Some(_) => None
// If the path is not data source based, fallback to catalog based resolution.
case None => TransactionalWrite.unapply(w)
}
val catalog = logical match {
case UnresolvedWith(w: TransactionalWritePlan, _, _) => resolve(w)
case w: TransactionalWritePlan => resolve(w)
case UnresolvedWith(TransactionalWrite(c), _, _) => Some(c)
case TransactionalWrite(c) => Some(c)
case _ => None
}
catalog.map(TransactionUtils.beginTransaction)
Expand All @@ -139,34 +129,6 @@ class QueryExecution(
}
private def transactionOpt: Option[Transaction] = lazyTransactionOpt.get

// For path-based tables (e.g. `format.`/path/to/table``) the first identifier part is a
// connector name. SupportsCatalogOptions on the connector tells us which catalog actually
// owns the table. Returns Some(catalog) if parts.head is a recognized SupportsCatalogOptions
// data source (caller decides whether the catalog is transactional), or None to fall through
// to the catalog-based extractor.
private def pathBased(write: TransactionalWritePlan): Option[TableCatalog] =
EliminateSubqueryAliases(write.table) match {
case UnresolvedRelation(parts, _, _) if parts.length > 1 =>
try {
DataSource.lookupDataSourceV2(parts.head, sparkSession.sessionState.conf)
.collect { case sco: SupportsCatalogOptions => sco }
.map { sco =>
val sessionConfigs = DataSourceV2Utils.extractSessionConfigs(
sco, sparkSession.sessionState.conf)
// Pass the entire identifier as option. The connector can decide how to parse it
// if needed.
val options = sessionConfigs + ("identifier" -> parts.mkString("."))
CatalogV2Util.getTableProviderCatalog(
sco, catalogManager, new CaseInsensitiveStringMap(options.asJava))
}
} catch {
// The head of the multipart identifier is not a registered data source.
// Fallback to catalog-based detection.
case _: ClassNotFoundException => None
}
case _ => None
}

// Per-query analyzer: uses a transaction-aware CatalogManager when a transaction is active,
// so that all catalog lookups and rule applications during analysis see the correct state
// without relying on thread-local context. Any nested QueryExecution that is created during
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import org.apache.spark.sql.catalyst.analysis.TimeTravelSpec
import org.apache.spark.sql.catalyst.expressions.Literal
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
import org.apache.spark.sql.connector.catalog.{CatalogV2Util, SessionConfigSupport, StagedTable, StagingTableCatalog, SupportsCatalogOptions, SupportsRead, Table, TableProvider}
import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogV2Util, DataSourceCatalogResolver, Identifier, SessionConfigSupport, StagedTable, StagingTableCatalog, SupportsCatalogOptions, SupportsRead, Table, TableProvider}
import org.apache.spark.sql.connector.catalog.TableCapability.BATCH_READ
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.execution.SQLExecution
Expand Down Expand Up @@ -99,6 +99,60 @@ private[sql] object DataSourceV2Utils extends Logging {
}
}

/**
* Builds the [[CaseInsensitiveStringMap]] passed to a v2 [[TableProvider]]: session configs
* extracted from the provider, merged with the caller-supplied options-with-path map.
*/
def buildDsOptions(
provider: TableProvider,
conf: SQLConf,
optionsWithPath: CaseInsensitiveMap[String]): CaseInsensitiveStringMap = {
val sessionOptions = extractSessionConfigs(provider, conf)
val finalOptions = sessionOptions.filter { case (k, _) => !optionsWithPath.contains(k) } ++
optionsWithPath.originalMap
new CaseInsensitiveStringMap(finalOptions.asJava)
}

/**
* Extracts the catalog name and connector-canonical identifier from a
* [[SupportsCatalogOptions]] provider. Shared by all SCO entry points (DataFrame reader, SQL
* multipart-name resolution) so they observe identical null-handling semantics:
* `extractCatalog` returning null falls back to the session catalog, matching
* [[CatalogV2Util.getTableProviderCatalog]].
*/
def extractCatalogAndIdentifier(
provider: SupportsCatalogOptions,
dsOptions: CaseInsensitiveStringMap): (String, Identifier) = {
val ident = provider.extractIdentifier(dsOptions)
val catalogName = Option(provider.extractCatalog(dsOptions))
.getOrElse(CatalogManager.SESSION_CATALOG_NAME)
(catalogName, ident)
}

/**
* Resolver bound to a session [[SQLConf]] that maps a multipart SQL identifier
* (e.g. `pathformat.\`/path/to/t\``) to a `(catalogName, identifier)` pair when the head
* names a registered [[SupportsCatalogOptions]] data source. Returns `None` for non-SCO
* sources or unknown format heads, letting the caller fall back to standard catalog
* resolution.
*/
def supportsCatalogOptionsResolver(conf: SQLConf): DataSourceCatalogResolver =
(nameParts: Seq[String]) =>
try {
DataSource.lookupDataSourceV2(nameParts.head, conf).flatMap {
case sco: SupportsCatalogOptions =>
val optionsWithPath = getOptionsWithPaths(
CaseInsensitiveMap(Map.empty), nameParts.tail: _*)
val dsOptions = buildDsOptions(sco, conf, optionsWithPath)
Some(extractCatalogAndIdentifier(sco, dsOptions))
case _ => None
}
} catch {
// The format name is not a registered data source. Fall through and let the caller
// treat it as a catalog/namespace name.
case _: ClassNotFoundException => None
}

def loadV2Source(
sparkSession: SparkSession,
provider: TableProvider,
Expand All @@ -108,23 +162,16 @@ private[sql] object DataSourceV2Utils extends Logging {
paths: String*): Option[LogicalPlan] = {
val catalogManager = sparkSession.sessionState.catalogManager
val conf = sparkSession.sessionState.conf
val sessionOptions = DataSourceV2Utils.extractSessionConfigs(provider, conf)

val optionsWithPath = getOptionsWithPaths(extraOptions, paths: _*)

val finalOptions = sessionOptions.filter { case (k, _) => !optionsWithPath.contains(k) } ++
optionsWithPath.originalMap
val dsOptions = new CaseInsensitiveStringMap(finalOptions.asJava)
val dsOptions = buildDsOptions(provider, conf, optionsWithPath)
val (table, catalog, ident, timeTravelSpec) = provider match {
case _: SupportsCatalogOptions if userSpecifiedSchema.nonEmpty =>
throw new IllegalArgumentException(
s"$source does not support user specified schema. Please don't specify the schema.")
case hasCatalog: SupportsCatalogOptions =>
val ident = hasCatalog.extractIdentifier(dsOptions)
val catalog = CatalogV2Util.getTableProviderCatalog(
hasCatalog,
catalogManager,
dsOptions)
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
val (catalogName, ident) = extractCatalogAndIdentifier(hasCatalog, dsOptions)
val catalog = catalogManager.catalog(catalogName).asTableCatalog

val version = hasCatalog.extractTimeTravelVersion(dsOptions)
val timestamp = hasCatalog.extractTimeTravelTimestamp(dsOptions)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,15 @@ import org.apache.spark.sql.catalyst.parser.ParserInterface
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.classic.{SparkSession, Strategy, StreamingCheckpointManager, StreamingQueryManager, UDFRegistration}
import org.apache.spark.sql.connector.catalog.CatalogManager
import org.apache.spark.sql.connector.catalog.{CatalogManager, DataSourceCatalogResolver}
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.execution.{ColumnarRule, CommandExecutionMode, QueryExecution, SparkOptimizer, SparkPlanner, SparkSqlParser}
import org.apache.spark.sql.execution.adaptive.AdaptiveRulesHolder
import org.apache.spark.sql.execution.aggregate.{ResolveEncodersInScalaAgg, ScalaUDAF}
import org.apache.spark.sql.execution.analysis.DetectAmbiguousSelfJoin
import org.apache.spark.sql.execution.command.{CheckViewReferences, CommandCheck}
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.datasources.v2.{TableCapabilityCheck, V2SessionCatalog}
import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Utils, TableCapabilityCheck, V2SessionCatalog}
import org.apache.spark.sql.execution.externalUDF.{ClassicExternalUDFPlanner,
ExternalUDFPlanner, UnifiedExternalUDFPlanner}
import org.apache.spark.sql.execution.streaming.runtime.ResolveWriteToStream
Expand Down Expand Up @@ -162,8 +162,11 @@ abstract class BaseSessionStateBuilder(

protected lazy val v2SessionCatalog = new V2SessionCatalog(catalog)

protected lazy val dataSourceCatalogResolver: DataSourceCatalogResolver =
DataSourceV2Utils.supportsCatalogOptionsResolver(conf)

protected lazy val catalogManager = {
val cm = new CatalogManager(v2SessionCatalog, catalog)
val cm = new CatalogManager(v2SessionCatalog, catalog, dataSourceCatalogResolver)
parentState.foreach(ps => cm.copySessionPathFrom(ps.catalogManager))
cm
}
Expand Down
Loading