-
Notifications
You must be signed in to change notification settings - Fork 267
Fix BloomFilter buffer incompatibility between Spark and Comet #3003
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Fix BloomFilter buffer incompatibility between Spark and Comet #3003
Conversation
Handle Spark's full serialization format (12-byte header + bits) in merge_filter() to support Spark partial / Comet final execution. The fix automatically detects the format and extracts bits data accordingly. Fixes apache#2889
|
Thanks @Shekharrajak. Looks like you need to run |
Done. Please help in triggering the workflow. Thanks! |
|
@Shekharrajak there are compilation errors |
|
@andygrove , now it is looking fine locally. Do we have a way to run all the workflow checks to run locally so that we will be make sure everything is fine , before running the workflow in GitHub ? |
Codecov Report✅ All modified and coverable lines are covered by tests. Additional details and impacted files@@ Coverage Diff @@
## main #3003 +/- ##
============================================
+ Coverage 56.12% 59.55% +3.43%
- Complexity 976 1379 +403
============================================
Files 119 167 +48
Lines 11743 15496 +3753
Branches 2251 2569 +318
============================================
+ Hits 6591 9229 +2638
- Misses 4012 4970 +958
- Partials 1140 1297 +157 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
I think in the compilation error case that should be pretty reproducible locally. I definitely recommend running |
|
In the absence of any new tests, it feels like we should be relaxing a fallback constraint in operators.scala or modifying existing tests to exercise this behavior. Otherwise I suspect we're still falling back. @andygrove do you recall where we might want to make changes to test this behavior? |
|
I think this is the condition: https://github.com/apache/datafusion-comet/blob/main/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala#L1074 |
mbutrovich
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @Shekharrajak! Can you remove any relevant fallbacks and/or modify tests so we know that we're exercising this behavior?
Handle Spark's full serialization format (12-byte header + bits) in merge_filter() to support Spark partial / Comet final execution. The fix automatically detects the format and extracts bits data accordingly.
Fixes #2889
Rationale for this change
Spark's serialize() returns full format: 12-byte header (version + numHashFunctions + numWords) + bits data
Comet's state_as_bytes() returns bits data only
When Spark partial sends full format, Comet's merge_filter() expects bits-only, causing mismatch
Ref https://github.com/apache/spark/blob/master/common/sketch/src/main/java/org/apache/spark/util/sketch/BitArray.java#L99
Ref https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/BloomFilterAggregate.scala#L219
Spark format: BloomFilterImpl.writeTo() (4+4 bytes) + BitArray.writeTo() (4 bytes + bits)
What changes are included in this PR?
Detects Spark format (buffer size = 12 + expected_bits_size)
Extracts bits data by skipping 12-byte header if Spark format
Returns bits as-is if Comet format
How are these changes tested?
Spark SQL test