Skip to content

Commit

Permalink
Fix HashStringAllocator::clear() and cumulativeBytes_
Browse files Browse the repository at this point in the history
  • Loading branch information
wypb committed Jun 27, 2024
1 parent d26cb1d commit 13106b4
Show file tree
Hide file tree
Showing 3 changed files with 158 additions and 49 deletions.
90 changes: 64 additions & 26 deletions velox/common/memory/HashStringAllocator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,45 @@ void HashStringAllocator::clear() {
for (auto i = 0; i < kNumFreeLists; ++i) {
new (&state_.freeLists()[i]) CompactDoubleList();
}

#ifdef NDEBUG
static const auto kHugePageSize = memory::AllocationTraits::kHugePageSize;
for (auto i = 0; i < state_.pool().numRanges(); ++i) {
const auto range = state_.pool().rangeAt(i);
const auto rangeSize = range.size();
if (rangeSize >= kHugePageSize) {
VELOX_CHECK_EQ(0, rangeSize % kHugePageSize);
}

for (int64_t blockOffset = 0; blockOffset < rangeSize;
blockOffset += kHugePageSize) {
auto blockRange = folly::Range<char*>(
range.data() + blockOffset,
std::min<int64_t>(rangeSize, kHugePageSize));
const auto size = blockRange.size() - simd::kPadding;
auto* end = castToHeader(blockRange.data() + size);
auto* header = castToHeader(blockRange.data());
while (header != end) {
VELOX_CHECK_GE(reinterpret_cast<char*>(header), blockRange.data());
VELOX_CHECK_LT(
reinterpret_cast<char*>(header), reinterpret_cast<char*>(end));
VELOX_CHECK_LE(
reinterpret_cast<char*>(header->end()),
reinterpret_cast<char*>(end));

// Continued block & Non-free block.
if (!header->isFree()) {
state_.cumulativeBytes() -= blockBytes(header);
}
header = castToHeader(header->end());
}
}
}

VELOX_DCHECK_EQ(state_.cumulativeBytes(), 0);
VELOX_DCHECK_EQ(state_.sizeFromPool(), 0);
#endif

state_.pool().clear();
}

Expand Down Expand Up @@ -242,7 +281,7 @@ HashStringAllocator::finishWrite(
}

void HashStringAllocator::newSlab() {
constexpr int32_t kSimdPadding = simd::kPadding - sizeof(Header);
constexpr int32_t kSimdPadding = simd::kPadding - kHeaderSize;
const int64_t needed =
state_.pool().allocatedBytes() >= state_.pool().hugePageThreshold()
? memory::AllocationTraits::kHugePageSize
Expand All @@ -254,7 +293,7 @@ void HashStringAllocator::newSlab() {
// Sometimes the last range can be several huge pages for severl huge page
// sized arenas but checkConsistency() can interpret that.
VELOX_CHECK_EQ(state_.pool().freeBytes(), 0);
const auto available = needed - sizeof(Header) - kSimdPadding;
const auto available = needed - kHeaderSize - kSimdPadding;
VELOX_CHECK_GT(available, 0);

// Write end marker.
Expand All @@ -263,7 +302,7 @@ void HashStringAllocator::newSlab() {

// Add the new memory to the free list: Placement construct a header that
// covers the space from start to the end marker and add this to free list.
free(new (run) Header(available - sizeof(Header)));
free(new (run) Header(available - kHeaderSize));
}

void HashStringAllocator::newRange(
Expand All @@ -290,7 +329,7 @@ void HashStringAllocator::newRange(
// The last bytes of the last range are no longer payload. So do not count
// them in size and do not overwrite them if overwriting the multi-range
// entry. Set position at the new end.
lastRange->size -= sizeof(void*);
lastRange->size -= Header::kContinuedPtrSize;
lastRange->position = std::min(lastRange->size, lastRange->position);
}
*range = ByteRange{
Expand Down Expand Up @@ -330,7 +369,7 @@ StringView HashStringAllocator::contiguousString(

void HashStringAllocator::freeRestOfBlock(Header* header, int32_t keepBytes) {
keepBytes = std::max(keepBytes, kMinAlloc);
const int32_t freeSize = header->size() - keepBytes - sizeof(Header);
const int32_t freeSize = header->size() - keepBytes - kHeaderSize;
if (freeSize <= kMinAlloc) {
return;
}
Expand Down Expand Up @@ -359,8 +398,7 @@ HashStringAllocator::Header* HashStringAllocator::allocate(
bool exactSize) {
if (size > kMaxAlloc && exactSize) {
VELOX_CHECK_LE(size, Header::kSizeMask);
auto* header =
reinterpret_cast<Header*>(allocateFromPool(size + sizeof(Header)));
auto* header = castToHeader(allocateFromPool(size + kHeaderSize));
new (header) Header(size);
return header;
}
Expand Down Expand Up @@ -411,13 +449,13 @@ HashStringAllocator::Header* HashStringAllocator::allocateFromFreeList(
VELOX_CHECK(
found->isFree() && (!mustHaveSize || found->size() >= preferredSize));
--state_.numFree();
state_.freeBytes() -= found->size() + sizeof(Header);
state_.freeBytes() -= blockBytes(found);
removeFromFreeList(found);
auto* next = found->next();
if (next != nullptr) {
next->clearPreviousFree();
}
state_.cumulativeBytes() += found->size();
state_.cumulativeBytes() += blockBytes(found);
if (isFinalSize) {
freeRestOfBlock(found, preferredSize);
}
Expand All @@ -436,28 +474,28 @@ void HashStringAllocator::free(Header* header) {
!state_.pool().isInCurrentRange(headerToFree) &&
state_.allocationsFromPool().find(headerToFree) !=
state_.allocationsFromPool().end()) {
freeToPool(headerToFree, headerToFree->size() + sizeof(Header));
freeToPool(headerToFree, headerToFree->size() + kHeaderSize);
} else {
VELOX_CHECK(!headerToFree->isFree());
state_.freeBytes() += headerToFree->size() + sizeof(Header);
state_.cumulativeBytes() -= headerToFree->size();
state_.freeBytes() += blockBytes(headerToFree);
state_.cumulativeBytes() -= blockBytes(headerToFree);
Header* next = headerToFree->next();
if (next != nullptr) {
VELOX_CHECK(!next->isPreviousFree());
if (next->isFree()) {
--state_.numFree();
removeFromFreeList(next);
headerToFree->setSize(
headerToFree->size() + next->size() + sizeof(Header));
next = reinterpret_cast<Header*>(headerToFree->end());
headerToFree->size() + next->size() + kHeaderSize);
next = castToHeader(headerToFree->end());
VELOX_CHECK(next->isArenaEnd() || !next->isFree());
}
}
if (headerToFree->isPreviousFree()) {
auto* previousFree = getPreviousFree(headerToFree);
removeFromFreeList(previousFree);
previousFree->setSize(
previousFree->size() + headerToFree->size() + sizeof(Header));
previousFree->size() + headerToFree->size() + kHeaderSize);

headerToFree = previousFree;
} else {
Expand Down Expand Up @@ -570,7 +608,7 @@ inline bool HashStringAllocator::storeStringFast(
} else {
auto& freeList = state_.freeLists()[kNumFreeLists - 1];
header = headerOf(freeList.next());
const auto spaceTaken = roundedBytes + sizeof(Header);
const auto spaceTaken = roundedBytes + kHeaderSize;
if (spaceTaken > header->size()) {
return false;
}
Expand All @@ -587,7 +625,7 @@ inline bool HashStringAllocator::storeStringFast(
reinterpret_cast<CompactDoubleList*>(freeHeader->begin()));
header->setSize(roundedBytes);
state_.freeBytes() -= spaceTaken;
state_.cumulativeBytes() += roundedBytes;
state_.cumulativeBytes() += roundedBytes + kHeaderSize;
} else {
header =
allocateFromFreeList(roundedBytes, true, true, kNumFreeLists - 1);
Expand Down Expand Up @@ -650,8 +688,8 @@ std::string HashStringAllocator::toString() const {
std::min<int64_t>(topRangeSize, kHugePageSize));
auto size = range.size() - simd::kPadding;

auto end = reinterpret_cast<Header*>(range.data() + size);
auto header = reinterpret_cast<Header*>(range.data());
auto end = castToHeader(range.data() + size);
auto header = castToHeader(range.data());
while (header != nullptr && header != end) {
out << "\t" << header->toString() << std::endl;
header = header->next();
Expand Down Expand Up @@ -683,8 +721,8 @@ int64_t HashStringAllocator::checkConsistency() const {
std::min<int64_t>(topRangeSize, kHugePageSize));
const auto size = range.size() - simd::kPadding;
bool previousFree = false;
auto* end = reinterpret_cast<Header*>(range.data() + size);
auto* header = reinterpret_cast<Header*>(range.data());
auto* end = castToHeader(range.data() + size);
auto* header = castToHeader(range.data());
while (header != end) {
VELOX_CHECK_GE(reinterpret_cast<char*>(header), range.data());
VELOX_CHECK_LT(
Expand All @@ -703,18 +741,18 @@ int64_t HashStringAllocator::checkConsistency() const {
*(reinterpret_cast<int32_t*>(header->end()) - 1));
}
++numFree;
freeBytes += sizeof(Header) + header->size();
freeBytes += blockBytes(header);
} else if (header->isContinued()) {
// If the content of the header is continued, check the continued
// header is readable and not free.
auto* continued = header->nextContinued();
VELOX_CHECK(!continued->isFree());
allocatedBytes += header->size() - sizeof(void*);
allocatedBytes += blockBytes(header);
} else {
allocatedBytes += header->size();
allocatedBytes += blockBytes(header);
}
previousFree = header->isFree();
header = reinterpret_cast<Header*>(header->end());
header = castToHeader(header->end());
}
}
}
Expand All @@ -741,7 +779,7 @@ int64_t HashStringAllocator::checkConsistency() const {
} else {
VELOX_CHECK_GE(size - kMinAlloc, kNumFreeLists - 1);
}
bytesInFreeList += size + sizeof(Header);
bytesInFreeList += size + kHeaderSize;
}
}

Expand Down
20 changes: 15 additions & 5 deletions velox/common/memory/HashStringAllocator.h
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ class HashStringAllocator : public StreamArena {
/// Returns the Header of the block that is physically next to this block or
/// null if this is the last block of the arena.
Header* next() {
auto* next = reinterpret_cast<Header*>(end());
auto* next = castToHeader(end());
return next->data_ == kArenaEnd ? nullptr : next;
}

Expand Down Expand Up @@ -219,9 +219,18 @@ class HashStringAllocator : public StreamArena {

/// Returns the header immediately below 'data'.
static Header* headerOf(const void* data) {
return castToHeader(data) - 1;
}

/// Returns the header below 'data'.
static Header* castToHeader(const void* data) {
return reinterpret_cast<Header*>(
const_cast<char*>(reinterpret_cast<const char*>(data))) -
1;
const_cast<char*>(reinterpret_cast<const char*>(data)));
}

/// Returns the byte size of block pointed by 'header'.
inline size_t blockBytes(const Header* header) const {
return header->size() + kHeaderSize;
}

/// Returns ByteInputStream over the data in the range of 'header' and
Expand Down Expand Up @@ -306,8 +315,8 @@ class HashStringAllocator : public StreamArena {
/// the pointer because in the worst case we would have one allocation that
/// chains many small free blocks together via kContinued.
uint64_t freeSpace() const {
int64_t minFree = state_.freeBytes() -
state_.numFree() * (sizeof(Header) + sizeof(void*));
const int64_t minFree = state_.freeBytes() -
state_.numFree() * (kHeaderSize + Header::kContinuedPtrSize);
VELOX_CHECK_GE(minFree, 0, "Guaranteed free space cannot be negative");
return minFree;
}
Expand Down Expand Up @@ -358,6 +367,7 @@ class HashStringAllocator : public StreamArena {
static constexpr int32_t kUnitSize = 16 * memory::AllocationTraits::kPageSize;
static constexpr int32_t kMinContiguous = 48;
static constexpr int32_t kNumFreeLists = kMaxAlloc - kMinAlloc + 2;
static constexpr uint32_t kHeaderSize = sizeof(Header);

void newRange(
int32_t bytes,
Expand Down
Loading

0 comments on commit 13106b4

Please sign in to comment.