[SPARK-56961][SQL] Pass all options while loading changelog#56044
[SPARK-56961][SQL] Pass all options while loading changelog#56044aokolnychyi wants to merge 3 commits into
Conversation
| * | ||
| * @since 4.2.0 | ||
| */ | ||
| @Evolving | ||
| public class ChangelogInfo { |
There was a problem hiding this comment.
I think we should rename this given that we will have to pass options to loadTable as well. Right now, we already have TableInfo that we use for CREATE table cases. We will NOT be able to use TableInfo for loading tables. So ChangelogInfo conflicts with TableInfo as it is used for loading, not creation.
There was a problem hiding this comment.
Another alternative name to consider is ChangelogParameters but context seems a better fit.
There was a problem hiding this comment.
Thoughts, @gengliangwang @cloud-fan @johanl-db?
There was a problem hiding this comment.
My proposal would be:
loadTable(ident, tableContext (timeTravelSpec, writePrivileges), options)
- by default delegates to existing loadTable() methods
loadChangelog(ident, changelogContext (range, deduplicationMode, etc), options)
There was a problem hiding this comment.
We concluded that passing all options to load methods for tables and changelog is a requirement for external connectors like Delta and Iceberg.
|
cc @huaxingao, this is one of the items we discussed on the dev list for 4.2 |
…log/TableCatalog.java Co-authored-by: Gengliang Wang <gengliang@apache.org>
…ysis/RelationChanges.scala Co-authored-by: Gengliang Wang <gengliang@apache.org>
cloud-fan
left a comment
There was a problem hiding this comment.
This PR does two things: renames ChangelogInfo → ChangelogContext (and the associated ChangelogInfoUtils + the changelogInfo field on RelationChanges/ChangelogTable), and adds a CaseInsensitiveStringMap options parameter to TableCatalog.loadChangelog so connectors like Iceberg/Delta receive the full raw options map.
The rename is anticipatory: per @aokolnychyi's inline thread, the planned shape is loadTable(ident, tableContext, options) and loadChangelog(ident, changelogContext, options). Using ChangelogContext avoids the conflict with TableInfo (which is for CREATE).
Options-flow design: all four frontends (AstBuilder, DataFrameReader, DataStreamReader, SparkConnectPlanner) build a single CaseInsensitiveStringMap and pass it to both the UnresolvedRelation and the ChangelogContext builder. RelationResolution.resolveChangelog then forwards u.options to loadChangelog — so options stay as a single source of truth on UnresolvedRelation rather than being duplicated on RelationChanges. Reasonable.
LGTM overall — just a few test-coverage nits and an opportunistic naming cleanup inline.
|
|
||
| val opts = cat.lastOptions | ||
| assert(opts.isDefined) | ||
| assert(opts.get.get("customOption") == "customValue") |
There was a problem hiding this comment.
The Javadoc on loadChangelog says options "include the CDC-recognized keys (range, deduplication mode, etc.) that are also parsed into context" — but this test only asserts the custom key is forwarded. Worth also asserting a CDC-recognized key is present so the Javadoc claim is pinned by a test:
| assert(opts.get.get("customOption") == "customValue") | |
| assert(opts.get.get("customOption") == "customValue") | |
| assert(opts.get.get("startingVersion") == "1") |
| assert(range.endingVersion().get() == "5") | ||
| } | ||
|
|
||
| test("user-defined options are forwarded to loadChangelog") { |
There was a problem hiding this comment.
This test covers the DataFrame batch path, but the SQL WITH-clause path (AstBuilder) and the streaming paths (DataStreamReader, streaming SQL) each independently construct the UnresolvedRelation.options in their own frontend before RelationResolution dispatches. Would you mind adding smoke tests for at least the SQL and streaming DataFrame paths so a future regression in any frontend is caught here? (The end-to-end suite touches lastChangelogContext for those paths but not lastOptions.)
| private def evaluateRequirements( | ||
| changelog: Changelog, | ||
| options: ChangelogInfo): PostProcessingRequirements = { | ||
| options: ChangelogContext): PostProcessingRequirements = { | ||
| val requiresCarryOverRemoval = | ||
| options.deduplicationMode() != ChangelogInfo.DeduplicationMode.NONE && | ||
| options.deduplicationMode() != ChangelogContext.DeduplicationMode.NONE && | ||
| changelog.containsCarryoverRows() | ||
| val requiresUpdateDetection = | ||
| options.computeUpdates() && changelog.representsUpdateAsDeleteAndInsert() | ||
| val requiresNetChanges = | ||
| options.deduplicationMode() == ChangelogInfo.DeduplicationMode.NET_CHANGES && | ||
| options.deduplicationMode() == ChangelogContext.DeduplicationMode.NET_CHANGES && | ||
| changelog.containsIntermediateChanges() | ||
|
|
||
| // If carry-overs are surfaced and update detection is enabled without carry-over | ||
| // removal, carry-overs would be falsely classified as updates, leading to wrong | ||
| // results. Hence we throw. | ||
| if (requiresUpdateDetection && | ||
| changelog.containsCarryoverRows() && | ||
| options.deduplicationMode() == ChangelogInfo.DeduplicationMode.NONE) { | ||
| options.deduplicationMode() == ChangelogContext.DeduplicationMode.NONE) { |
There was a problem hiding this comment.
Now that options: CaseInsensitiveStringMap is a real distinct concept in this PR, naming a ChangelogContext value options here (and the doc just above — "user-provided ChangelogContext options") reads awkwardly. Since the PR is already updating every options. access in this method, mind renaming to context to match the rest of the rename?
| private def evaluateRequirements( | |
| changelog: Changelog, | |
| options: ChangelogInfo): PostProcessingRequirements = { | |
| options: ChangelogContext): PostProcessingRequirements = { | |
| val requiresCarryOverRemoval = | |
| options.deduplicationMode() != ChangelogInfo.DeduplicationMode.NONE && | |
| options.deduplicationMode() != ChangelogContext.DeduplicationMode.NONE && | |
| changelog.containsCarryoverRows() | |
| val requiresUpdateDetection = | |
| options.computeUpdates() && changelog.representsUpdateAsDeleteAndInsert() | |
| val requiresNetChanges = | |
| options.deduplicationMode() == ChangelogInfo.DeduplicationMode.NET_CHANGES && | |
| options.deduplicationMode() == ChangelogContext.DeduplicationMode.NET_CHANGES && | |
| changelog.containsIntermediateChanges() | |
| // If carry-overs are surfaced and update detection is enabled without carry-over | |
| // removal, carry-overs would be falsely classified as updates, leading to wrong | |
| // results. Hence we throw. | |
| if (requiresUpdateDetection && | |
| changelog.containsCarryoverRows() && | |
| options.deduplicationMode() == ChangelogInfo.DeduplicationMode.NONE) { | |
| options.deduplicationMode() == ChangelogContext.DeduplicationMode.NONE) { | |
| private def evaluateRequirements( | |
| changelog: Changelog, | |
| context: ChangelogContext): PostProcessingRequirements = { | |
| val requiresCarryOverRemoval = | |
| context.deduplicationMode() != ChangelogContext.DeduplicationMode.NONE && | |
| changelog.containsCarryoverRows() | |
| val requiresUpdateDetection = | |
| context.computeUpdates() && changelog.representsUpdateAsDeleteAndInsert() | |
| val requiresNetChanges = | |
| context.deduplicationMode() == ChangelogContext.DeduplicationMode.NET_CHANGES && | |
| changelog.containsIntermediateChanges() | |
| // If carry-overs are surfaced and update detection is enabled without carry-over | |
| // removal, carry-overs would be falsely classified as updates, leading to wrong | |
| // results. Hence we throw. | |
| if (requiresUpdateDetection && | |
| changelog.containsCarryoverRows() && | |
| context.deduplicationMode() == ChangelogContext.DeduplicationMode.NONE) { |
What changes were proposed in this pull request?
This PR passes all specified options while loading changelogs.
Why are the changes needed?
These changes are needed to make the API usable in connectors like Iceberg and Delta.
Does this PR introduce any user-facing change?
The functionality hasn't been released yet.
How was this patch tested?
This PR comes with tests.
Was this patch authored or co-authored using generative AI tooling?
Claude Code v2.1.145.