Skip to content

Add constant support in PrestoIterativePartitioningSerializer#1875

Merged
yingsu00 merged 1 commit intoIBM:optimized_partitionedoutputfrom
xin-zhang2:PartitionedOutput3.0-constant
Apr 22, 2026
Merged

Add constant support in PrestoIterativePartitioningSerializer#1875
yingsu00 merged 1 commit intoIBM:optimized_partitionedoutputfrom
xin-zhang2:PartitionedOutput3.0-constant

Conversation

@xin-zhang2
Copy link
Copy Markdown
Member

No description provided.

@xin-zhang2 xin-zhang2 requested a review from yingsu00 April 1, 2026 15:20
@xin-zhang2 xin-zhang2 changed the title feat(PartitionedOutput): Add constant support in PrestoIterativeParti… feat(PartitionedOutput): Add constant support in PrestoIterativePartitioningSerializer Apr 1, 2026
@xin-zhang2 xin-zhang2 changed the title feat(PartitionedOutput): Add constant support in PrestoIterativePartitioningSerializer Add constant support in PrestoIterativePartitioningSerializer Apr 1, 2026
Comment thread velox/serializers/PrestoIterativePartitioningSerializer.cpp Outdated
Comment thread velox/serializers/PrestoIterativePartitioningSerializer.cpp Outdated
Comment thread velox/serializers/PrestoIterativePartitioningSerializer.cpp Outdated
nullableValues<int64_t>(r1, 0),
testing::UnorderedElementsAre(std::nullopt, 7, 7));
}

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please add a test that mixes a CONSTANT append with a FLAT append for the same column? With and without nulls.

Comment thread velox/serializers/tests/PrestoIterativePartitioningSerializerTest.cpp Outdated
@yingsu00
Copy link
Copy Markdown
Collaborator

yingsu00 commented Apr 3, 2026

@xin-zhang2 Can you please add it to the benchmark?

@xin-zhang2 xin-zhang2 force-pushed the PartitionedOutput3.0-constant branch from f45957b to f61f509 Compare April 9, 2026 15:45
friend class AppendWindow;
};

/// A scoped wrapper that provides 'size' T's of writable space in 'stream'.
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually the output pointer returned by the AppendWindow is not guaranteed to be alignof(T). So we have to use folly::storeUnaligned. This makes the caller more complicated and cannot use std::fill() directly.

Let's not use this approach and add an appendN() API to ByteOutputStream:

template <typename T>
void appendN(T value, int32_t count, Scratch& scratch) {
  if (count <= 0) {
    return;
  }

  AppendWindow<T> window(*this, scratch);
  auto* out = window.get(count);

  if constexpr (sizeof(T) == 1) {
    std::memset(out, static_cast<uint8_t>(value), count);
  } else if ((reinterpret_cast<uintptr_t>(out) % alignof(T)) == 0) {
    auto* typed = reinterpret_cast<T*>(out);
    std::fill(typed, typed + count, value);
  } else {
    for (int32_t i = 0; i < count; ++i) {
      folly::storeUnaligned<T>(out + i * sizeof(T), value);
    }
  }
}

Then IOBufOutputStream can forward:

template <typename T>
void appendN(T value, int32_t count, Scratch& scratch) {
  out_->appendN<T>(value, count, scratch);
  if (listener_ != nullptr && count > 0) {
    // listener notification policy here
  }
}

Make this as the first commit in this PR, with a test to ByteStreamTest.

Then the second commit should be the constant vector support that uses appendN.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@yingsu00
Thanks for the suggestion!
After thinking about it again, I'm a bit concerned that AppendWindow might not be the best choice here.
AppendWindow relies on ByteOutputStream, which stores data in a vector of ByteRanges. As a result, the data may be split into different ranges and is therefore not contiguous. This doesn't work well with the listener notification, since listener.onWrite requires a contiguous input buffer.

virtual void onWrite(const char* /* s */, std::streamsize /* count */)

As a result, we would have to call onWrite multiple times like

if (listener_ != nullptr && count > 0) {
    const char* p = reinterpret_cast<const char*>(&value);
    for (int32_t i = 0; i < count; ++i) {
      listener_->onWrite(p, sizeof(T));
    }
}

What do you think about the apporach in #1875 (comment)?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The approach in #1875 (comment) uses a temporary buffer to hold all the values. We can also use a fixed-size buffer to limit the memory usage, for example

      constexpr int32_t kChunkBytes = 4096;
      auto chunkRows = std::max<vector_size_t>(1, kChunkBytes / sizeof(T));
      auto bufferedRows = std::min<vector_size_t>(numRows, chunkRows);
      
      ScratchPtr<T> values(scratch);
      auto* ptr = values.get(bufferedRows);
      std::fill_n(ptr, bufferedRows, value);
      auto* bytes = reinterpret_cast<const char*>(ptr);

      vector_size_t remaining = numRows;
      while (remaining > 0) {
        auto n = std::min(remaining, chunkRows);
        outputStreams[p]->write(bytes, n * sizeof(T));
        remaining -= n;
      }

Comment thread velox/common/memory/ByteStream.h Outdated
AppendWindow(ByteOutputStream& stream, Scratch& scratch)
: stream_(stream), scratchPtr_(scratch) {}

AppendWindow(IOBufOutputStream& stream, Scratch& scratch)
Copy link
Copy Markdown
Collaborator

@yingsu00 yingsu00 Apr 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Btw. Even if we still use AppendWindow, it's better to add a getAppendWindow() API to IOBufOutputStream that calls out_->getAppendWindow(). THat way you don't have to expose internals of IOBufOutputStream.

@xin-zhang2 xin-zhang2 force-pushed the optimized_partitionedoutput branch from 57c7447 to d375a33 Compare April 14, 2026 16:49
@xin-zhang2 xin-zhang2 force-pushed the optimized_partitionedoutput branch from d375a33 to 70ad998 Compare April 15, 2026 10:02
@xin-zhang2 xin-zhang2 removed the request for review from majetideepak April 16, 2026 09:09
@xin-zhang2 xin-zhang2 force-pushed the PartitionedOutput3.0-constant branch 2 times, most recently from 7b744ef to 90aaa09 Compare April 16, 2026 12:50
@xin-zhang2 xin-zhang2 force-pushed the optimized_partitionedoutput branch from 70ad998 to 211901c Compare April 17, 2026 12:12
Copy link
Copy Markdown
Collaborator

@yingsu00 yingsu00 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@xin-zhang2 Could. you please squash the benchmark with the previous commit?

Comment thread velox/serializers/PrestoIterativePartitioningSerializer.cpp Outdated
Comment thread velox/serializers/PrestoIterativePartitioningSerializer.cpp Outdated
Comment thread velox/serializers/PrestoIterativePartitioningSerializer.cpp Outdated

template <>
void PrestoIterativePartitioningSerializer::flushSingleConstantVector<
TypeKind::BOOLEAN>(
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really need this specialization? Is it possible do it in place in the generic one? The only differences are 1) the value width 2) value
If we make them variables can it be done?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, the specialization is not necessary and I've removed it.
Actually, the generic implementation works correctly for BOOLEAN type, as long as sizeof(bool) == 1 holds.

According to the C++ standards, sizeof(bool) is implementation-defined, but in practice it is almost always equals to 1, and it seems the codebase has already made this assumption. For example, in estimateFlattenedConstantSerializedSize, sizeof(T) is used as the element size and there's no specialization for BOOLEAN either.

@xin-zhang2 xin-zhang2 force-pushed the PartitionedOutput3.0-constant branch 2 times, most recently from c4ac386 to 005eaa2 Compare April 20, 2026 13:55
startBit = partitionOffsets[p];
auto encoding = pv->baseVector()->encoding();
switch (encoding) {
case VectorEncoding::Simple::FLAT:
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

other than CONSTANT, other simple encoded vectors should flush null the same way.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the wire format should be the same for those encodings, and we can add the support once they are implemented, in case any specific logic requried. For example, we might need to consider the additional null information from the parent if the column is a child of a row-encoded column.

Comment thread velox/serializers/PrestoIterativePartitioningSerializer.cpp Outdated
for (uint32_t p : nonEmptyPartitions) {
vector_size_t numBits = rawPartitionOffsets[p] - startBit;
if (isNullConstant && numBits > 0 && !bitmaps[p].empty()) {
bits::fillBits(
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

bits::fillBits is slow. THere are just 2 values, 0 and 1. We can use memset directly.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@yingsu00
bitmap[p] is not guaranteed to be byte-aligned, so we would need to handle the cases for the non-aligned cases if we switch to memset. This would be similiar to bits::fillBits, which operates the bitmap word-wise, and handles the unaligned boundaries.

The implementation of bits::fillBits is like

  if (begin != firstWord) {
    partialWordFunc(begin / 64, highMask(firstWord - begin));
  }
  for (int32_t i = firstWord; i + 64 <= lastWord; i += 64) {
    fullWordFunc(i / 64);
  }
  if (end != lastWord) {
    partialWordFunc(lastWord / 64, lowMask(end - lastWord));
  }

where fullWordFunc is [bits, value](int32_t idx) { bits[idx] = value ? -1 : 0; }

I'm not sure if it is worth to switch to memset here, and perhaps we can keep bits::fillBits for now.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, let's keep using it now.

@xin-zhang2 xin-zhang2 force-pushed the PartitionedOutput3.0-constant branch from 005eaa2 to f420bdc Compare April 20, 2026 17:18
@xin-zhang2 xin-zhang2 requested a review from yingsu00 April 20, 2026 17:19
const auto* partitionOffsets = partitionedVector->rawPartitionOffsets();

Scratch scratch;
auto numChunkRows = std::max<vector_size_t>(1, kChunkBytes / sizeof(T));
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

numChunkRows -> numRowsPerChunk

auto bufferedRows = std::min<vector_size_t>(numRows, numChunkRows);
ScratchPtr<T> values(scratch);
auto* ptr = values.get(bufferedRows);
std::fill_n(ptr, bufferedRows, value);
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need to fill in every batch. Hoist out Scratch allocation and fill_n out of the loop. The buffer can be reused across partitions

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved the definition of chunkBytes to outside the loop so it can be reused across partitions. It is lazily initialized when the first partition with numRows > 0 is processed.

Comment thread velox/serializers/PrestoIterativePartitioningSerializer.cpp Outdated
Comment thread velox/serializers/PrestoIterativePartitioningSerializer.cpp Outdated
for (uint32_t p : nonEmptyPartitions) {
vector_size_t numBits = rawPartitionOffsets[p] - startBit;
if (isNullConstant && numBits > 0 && !bitmaps[p].empty()) {
bits::fillBits(
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, let's keep using it now.

@xin-zhang2 xin-zhang2 force-pushed the PartitionedOutput3.0-constant branch from f420bdc to 8d9c480 Compare April 21, 2026 13:49
@yingsu00 yingsu00 merged commit 627bf5d into IBM:optimized_partitionedoutput Apr 22, 2026
4 of 5 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants