Add constant support in PrestoIterativePartitioningSerializer#1875
Conversation
| nullableValues<int64_t>(r1, 0), | ||
| testing::UnorderedElementsAre(std::nullopt, 7, 7)); | ||
| } | ||
|
|
There was a problem hiding this comment.
Can you please add a test that mixes a CONSTANT append with a FLAT append for the same column? With and without nulls.
|
@xin-zhang2 Can you please add it to the benchmark? |
f45957b to
f61f509
Compare
| friend class AppendWindow; | ||
| }; | ||
|
|
||
| /// A scoped wrapper that provides 'size' T's of writable space in 'stream'. |
There was a problem hiding this comment.
Actually the output pointer returned by the AppendWindow is not guaranteed to be alignof(T). So we have to use folly::storeUnaligned. This makes the caller more complicated and cannot use std::fill() directly.
Let's not use this approach and add an appendN() API to ByteOutputStream:
template <typename T>
void appendN(T value, int32_t count, Scratch& scratch) {
if (count <= 0) {
return;
}
AppendWindow<T> window(*this, scratch);
auto* out = window.get(count);
if constexpr (sizeof(T) == 1) {
std::memset(out, static_cast<uint8_t>(value), count);
} else if ((reinterpret_cast<uintptr_t>(out) % alignof(T)) == 0) {
auto* typed = reinterpret_cast<T*>(out);
std::fill(typed, typed + count, value);
} else {
for (int32_t i = 0; i < count; ++i) {
folly::storeUnaligned<T>(out + i * sizeof(T), value);
}
}
}
Then IOBufOutputStream can forward:
template <typename T>
void appendN(T value, int32_t count, Scratch& scratch) {
out_->appendN<T>(value, count, scratch);
if (listener_ != nullptr && count > 0) {
// listener notification policy here
}
}
Make this as the first commit in this PR, with a test to ByteStreamTest.
Then the second commit should be the constant vector support that uses appendN.
There was a problem hiding this comment.
@yingsu00
Thanks for the suggestion!
After thinking about it again, I'm a bit concerned that AppendWindow might not be the best choice here.
AppendWindow relies on ByteOutputStream, which stores data in a vector of ByteRanges. As a result, the data may be split into different ranges and is therefore not contiguous. This doesn't work well with the listener notification, since listener.onWrite requires a contiguous input buffer.
virtual void onWrite(const char* /* s */, std::streamsize /* count */)
As a result, we would have to call onWrite multiple times like
if (listener_ != nullptr && count > 0) {
const char* p = reinterpret_cast<const char*>(&value);
for (int32_t i = 0; i < count; ++i) {
listener_->onWrite(p, sizeof(T));
}
}
What do you think about the apporach in #1875 (comment)?
There was a problem hiding this comment.
The approach in #1875 (comment) uses a temporary buffer to hold all the values. We can also use a fixed-size buffer to limit the memory usage, for example
constexpr int32_t kChunkBytes = 4096;
auto chunkRows = std::max<vector_size_t>(1, kChunkBytes / sizeof(T));
auto bufferedRows = std::min<vector_size_t>(numRows, chunkRows);
ScratchPtr<T> values(scratch);
auto* ptr = values.get(bufferedRows);
std::fill_n(ptr, bufferedRows, value);
auto* bytes = reinterpret_cast<const char*>(ptr);
vector_size_t remaining = numRows;
while (remaining > 0) {
auto n = std::min(remaining, chunkRows);
outputStreams[p]->write(bytes, n * sizeof(T));
remaining -= n;
}
| AppendWindow(ByteOutputStream& stream, Scratch& scratch) | ||
| : stream_(stream), scratchPtr_(scratch) {} | ||
|
|
||
| AppendWindow(IOBufOutputStream& stream, Scratch& scratch) |
There was a problem hiding this comment.
Btw. Even if we still use AppendWindow, it's better to add a getAppendWindow() API to IOBufOutputStream that calls out_->getAppendWindow(). THat way you don't have to expose internals of IOBufOutputStream.
57c7447 to
d375a33
Compare
d375a33 to
70ad998
Compare
7b744ef to
90aaa09
Compare
70ad998 to
211901c
Compare
yingsu00
left a comment
There was a problem hiding this comment.
@xin-zhang2 Could. you please squash the benchmark with the previous commit?
|
|
||
| template <> | ||
| void PrestoIterativePartitioningSerializer::flushSingleConstantVector< | ||
| TypeKind::BOOLEAN>( |
There was a problem hiding this comment.
Do we really need this specialization? Is it possible do it in place in the generic one? The only differences are 1) the value width 2) value
If we make them variables can it be done?
There was a problem hiding this comment.
No, the specialization is not necessary and I've removed it.
Actually, the generic implementation works correctly for BOOLEAN type, as long as sizeof(bool) == 1 holds.
According to the C++ standards, sizeof(bool) is implementation-defined, but in practice it is almost always equals to 1, and it seems the codebase has already made this assumption. For example, in estimateFlattenedConstantSerializedSize, sizeof(T) is used as the element size and there's no specialization for BOOLEAN either.
c4ac386 to
005eaa2
Compare
| startBit = partitionOffsets[p]; | ||
| auto encoding = pv->baseVector()->encoding(); | ||
| switch (encoding) { | ||
| case VectorEncoding::Simple::FLAT: |
There was a problem hiding this comment.
other than CONSTANT, other simple encoded vectors should flush null the same way.
There was a problem hiding this comment.
Yes, the wire format should be the same for those encodings, and we can add the support once they are implemented, in case any specific logic requried. For example, we might need to consider the additional null information from the parent if the column is a child of a row-encoded column.
| for (uint32_t p : nonEmptyPartitions) { | ||
| vector_size_t numBits = rawPartitionOffsets[p] - startBit; | ||
| if (isNullConstant && numBits > 0 && !bitmaps[p].empty()) { | ||
| bits::fillBits( |
There was a problem hiding this comment.
bits::fillBits is slow. THere are just 2 values, 0 and 1. We can use memset directly.
There was a problem hiding this comment.
@yingsu00
bitmap[p] is not guaranteed to be byte-aligned, so we would need to handle the cases for the non-aligned cases if we switch to memset. This would be similiar to bits::fillBits, which operates the bitmap word-wise, and handles the unaligned boundaries.
The implementation of bits::fillBits is like
if (begin != firstWord) {
partialWordFunc(begin / 64, highMask(firstWord - begin));
}
for (int32_t i = firstWord; i + 64 <= lastWord; i += 64) {
fullWordFunc(i / 64);
}
if (end != lastWord) {
partialWordFunc(lastWord / 64, lowMask(end - lastWord));
}
where fullWordFunc is [bits, value](int32_t idx) { bits[idx] = value ? -1 : 0; }
I'm not sure if it is worth to switch to memset here, and perhaps we can keep bits::fillBits for now.
There was a problem hiding this comment.
Ok, let's keep using it now.
005eaa2 to
f420bdc
Compare
| const auto* partitionOffsets = partitionedVector->rawPartitionOffsets(); | ||
|
|
||
| Scratch scratch; | ||
| auto numChunkRows = std::max<vector_size_t>(1, kChunkBytes / sizeof(T)); |
There was a problem hiding this comment.
numChunkRows -> numRowsPerChunk
| auto bufferedRows = std::min<vector_size_t>(numRows, numChunkRows); | ||
| ScratchPtr<T> values(scratch); | ||
| auto* ptr = values.get(bufferedRows); | ||
| std::fill_n(ptr, bufferedRows, value); |
There was a problem hiding this comment.
No need to fill in every batch. Hoist out Scratch allocation and fill_n out of the loop. The buffer can be reused across partitions
There was a problem hiding this comment.
Moved the definition of chunkBytes to outside the loop so it can be reused across partitions. It is lazily initialized when the first partition with numRows > 0 is processed.
| for (uint32_t p : nonEmptyPartitions) { | ||
| vector_size_t numBits = rawPartitionOffsets[p] - startBit; | ||
| if (isNullConstant && numBits > 0 && !bitmaps[p].empty()) { | ||
| bits::fillBits( |
There was a problem hiding this comment.
Ok, let's keep using it now.
…tioningSerializer
f420bdc to
8d9c480
Compare
627bf5d
into
IBM:optimized_partitionedoutput
No description provided.