diff --git a/velox/common/memory/HashStringAllocator.cpp b/velox/common/memory/HashStringAllocator.cpp index 0fe0a2b68a2da..b62e111c78cf4 100644 --- a/velox/common/memory/HashStringAllocator.cpp +++ b/velox/common/memory/HashStringAllocator.cpp @@ -100,6 +100,45 @@ void HashStringAllocator::clear() { for (auto i = 0; i < kNumFreeLists; ++i) { new (&state_.freeLists()[i]) CompactDoubleList(); } + +#ifdef NDEBUG + static const auto kHugePageSize = memory::AllocationTraits::kHugePageSize; + for (auto i = 0; i < state_.pool().numRanges(); ++i) { + const auto range = state_.pool().rangeAt(i); + const auto rangeSize = range.size(); + if (rangeSize >= kHugePageSize) { + VELOX_CHECK_EQ(0, rangeSize % kHugePageSize); + } + + for (int64_t blockOffset = 0; blockOffset < rangeSize; + blockOffset += kHugePageSize) { + auto blockRange = folly::Range( + range.data() + blockOffset, + std::min(rangeSize, kHugePageSize)); + const auto size = blockRange.size() - simd::kPadding; + auto* end = castToHeader(blockRange.data() + size); + auto* header = castToHeader(blockRange.data()); + while (header != end) { + VELOX_CHECK_GE(reinterpret_cast(header), blockRange.data()); + VELOX_CHECK_LT( + reinterpret_cast(header), reinterpret_cast(end)); + VELOX_CHECK_LE( + reinterpret_cast(header->end()), + reinterpret_cast(end)); + + // Continued block & Non-free block. + if (!header->isFree()) { + state_.cumulativeBytes() -= blockBytes(header); + } + header = castToHeader(header->end()); + } + } + } + + VELOX_DCHECK_EQ(state_.cumulativeBytes(), 0); + VELOX_DCHECK_EQ(state_.sizeFromPool(), 0); +#endif + state_.pool().clear(); } @@ -242,7 +281,7 @@ HashStringAllocator::finishWrite( } void HashStringAllocator::newSlab() { - constexpr int32_t kSimdPadding = simd::kPadding - sizeof(Header); + constexpr int32_t kSimdPadding = simd::kPadding - kHeaderSize; const int64_t needed = state_.pool().allocatedBytes() >= state_.pool().hugePageThreshold() ? memory::AllocationTraits::kHugePageSize @@ -254,7 +293,7 @@ void HashStringAllocator::newSlab() { // Sometimes the last range can be several huge pages for severl huge page // sized arenas but checkConsistency() can interpret that. VELOX_CHECK_EQ(state_.pool().freeBytes(), 0); - const auto available = needed - sizeof(Header) - kSimdPadding; + const auto available = needed - kHeaderSize - kSimdPadding; VELOX_CHECK_GT(available, 0); // Write end marker. @@ -263,7 +302,7 @@ void HashStringAllocator::newSlab() { // Add the new memory to the free list: Placement construct a header that // covers the space from start to the end marker and add this to free list. - free(new (run) Header(available - sizeof(Header))); + free(new (run) Header(available - kHeaderSize)); } void HashStringAllocator::newRange( @@ -290,7 +329,7 @@ void HashStringAllocator::newRange( // The last bytes of the last range are no longer payload. So do not count // them in size and do not overwrite them if overwriting the multi-range // entry. Set position at the new end. - lastRange->size -= sizeof(void*); + lastRange->size -= Header::kContinuedPtrSize; lastRange->position = std::min(lastRange->size, lastRange->position); } *range = ByteRange{ @@ -330,7 +369,7 @@ StringView HashStringAllocator::contiguousString( void HashStringAllocator::freeRestOfBlock(Header* header, int32_t keepBytes) { keepBytes = std::max(keepBytes, kMinAlloc); - const int32_t freeSize = header->size() - keepBytes - sizeof(Header); + const int32_t freeSize = header->size() - keepBytes - kHeaderSize; if (freeSize <= kMinAlloc) { return; } @@ -359,8 +398,7 @@ HashStringAllocator::Header* HashStringAllocator::allocate( bool exactSize) { if (size > kMaxAlloc && exactSize) { VELOX_CHECK_LE(size, Header::kSizeMask); - auto* header = - reinterpret_cast(allocateFromPool(size + sizeof(Header))); + auto* header = castToHeader(allocateFromPool(size + kHeaderSize)); new (header) Header(size); return header; } @@ -411,13 +449,13 @@ HashStringAllocator::Header* HashStringAllocator::allocateFromFreeList( VELOX_CHECK( found->isFree() && (!mustHaveSize || found->size() >= preferredSize)); --state_.numFree(); - state_.freeBytes() -= found->size() + sizeof(Header); + state_.freeBytes() -= blockBytes(found); removeFromFreeList(found); auto* next = found->next(); if (next != nullptr) { next->clearPreviousFree(); } - state_.cumulativeBytes() += found->size(); + state_.cumulativeBytes() += blockBytes(found); if (isFinalSize) { freeRestOfBlock(found, preferredSize); } @@ -436,11 +474,11 @@ void HashStringAllocator::free(Header* header) { !state_.pool().isInCurrentRange(headerToFree) && state_.allocationsFromPool().find(headerToFree) != state_.allocationsFromPool().end()) { - freeToPool(headerToFree, headerToFree->size() + sizeof(Header)); + freeToPool(headerToFree, headerToFree->size() + kHeaderSize); } else { VELOX_CHECK(!headerToFree->isFree()); - state_.freeBytes() += headerToFree->size() + sizeof(Header); - state_.cumulativeBytes() -= headerToFree->size(); + state_.freeBytes() += blockBytes(headerToFree); + state_.cumulativeBytes() -= blockBytes(headerToFree); Header* next = headerToFree->next(); if (next != nullptr) { VELOX_CHECK(!next->isPreviousFree()); @@ -448,8 +486,8 @@ void HashStringAllocator::free(Header* header) { --state_.numFree(); removeFromFreeList(next); headerToFree->setSize( - headerToFree->size() + next->size() + sizeof(Header)); - next = reinterpret_cast(headerToFree->end()); + headerToFree->size() + next->size() + kHeaderSize); + next = castToHeader(headerToFree->end()); VELOX_CHECK(next->isArenaEnd() || !next->isFree()); } } @@ -457,7 +495,7 @@ void HashStringAllocator::free(Header* header) { auto* previousFree = getPreviousFree(headerToFree); removeFromFreeList(previousFree); previousFree->setSize( - previousFree->size() + headerToFree->size() + sizeof(Header)); + previousFree->size() + headerToFree->size() + kHeaderSize); headerToFree = previousFree; } else { @@ -570,7 +608,7 @@ inline bool HashStringAllocator::storeStringFast( } else { auto& freeList = state_.freeLists()[kNumFreeLists - 1]; header = headerOf(freeList.next()); - const auto spaceTaken = roundedBytes + sizeof(Header); + const auto spaceTaken = roundedBytes + kHeaderSize; if (spaceTaken > header->size()) { return false; } @@ -587,7 +625,7 @@ inline bool HashStringAllocator::storeStringFast( reinterpret_cast(freeHeader->begin())); header->setSize(roundedBytes); state_.freeBytes() -= spaceTaken; - state_.cumulativeBytes() += roundedBytes; + state_.cumulativeBytes() += roundedBytes + kHeaderSize; } else { header = allocateFromFreeList(roundedBytes, true, true, kNumFreeLists - 1); @@ -650,8 +688,8 @@ std::string HashStringAllocator::toString() const { std::min(topRangeSize, kHugePageSize)); auto size = range.size() - simd::kPadding; - auto end = reinterpret_cast(range.data() + size); - auto header = reinterpret_cast(range.data()); + auto end = castToHeader(range.data() + size); + auto header = castToHeader(range.data()); while (header != nullptr && header != end) { out << "\t" << header->toString() << std::endl; header = header->next(); @@ -683,8 +721,8 @@ int64_t HashStringAllocator::checkConsistency() const { std::min(topRangeSize, kHugePageSize)); const auto size = range.size() - simd::kPadding; bool previousFree = false; - auto* end = reinterpret_cast(range.data() + size); - auto* header = reinterpret_cast(range.data()); + auto* end = castToHeader(range.data() + size); + auto* header = castToHeader(range.data()); while (header != end) { VELOX_CHECK_GE(reinterpret_cast(header), range.data()); VELOX_CHECK_LT( @@ -703,18 +741,18 @@ int64_t HashStringAllocator::checkConsistency() const { *(reinterpret_cast(header->end()) - 1)); } ++numFree; - freeBytes += sizeof(Header) + header->size(); + freeBytes += blockBytes(header); } else if (header->isContinued()) { // If the content of the header is continued, check the continued // header is readable and not free. auto* continued = header->nextContinued(); VELOX_CHECK(!continued->isFree()); - allocatedBytes += header->size() - sizeof(void*); + allocatedBytes += blockBytes(header); } else { - allocatedBytes += header->size(); + allocatedBytes += blockBytes(header); } previousFree = header->isFree(); - header = reinterpret_cast(header->end()); + header = castToHeader(header->end()); } } } @@ -741,7 +779,7 @@ int64_t HashStringAllocator::checkConsistency() const { } else { VELOX_CHECK_GE(size - kMinAlloc, kNumFreeLists - 1); } - bytesInFreeList += size + sizeof(Header); + bytesInFreeList += size + kHeaderSize; } } diff --git a/velox/common/memory/HashStringAllocator.h b/velox/common/memory/HashStringAllocator.h index 1b3ddd7cd40f3..cd3025ac33858 100644 --- a/velox/common/memory/HashStringAllocator.h +++ b/velox/common/memory/HashStringAllocator.h @@ -130,7 +130,7 @@ class HashStringAllocator : public StreamArena { /// Returns the Header of the block that is physically next to this block or /// null if this is the last block of the arena. Header* next() { - auto* next = reinterpret_cast(end()); + auto* next = castToHeader(end()); return next->data_ == kArenaEnd ? nullptr : next; } @@ -219,9 +219,18 @@ class HashStringAllocator : public StreamArena { /// Returns the header immediately below 'data'. static Header* headerOf(const void* data) { + return castToHeader(data) - 1; + } + + /// Returns the header below 'data'. + static Header* castToHeader(const void* data) { return reinterpret_cast( - const_cast(reinterpret_cast(data))) - - 1; + const_cast(reinterpret_cast(data))); + } + + /// Returns the byte size of block pointed by 'header'. + inline size_t blockBytes(const Header* header) const { + return header->size() + kHeaderSize; } /// Returns ByteInputStream over the data in the range of 'header' and @@ -306,8 +315,8 @@ class HashStringAllocator : public StreamArena { /// the pointer because in the worst case we would have one allocation that /// chains many small free blocks together via kContinued. uint64_t freeSpace() const { - int64_t minFree = state_.freeBytes() - - state_.numFree() * (sizeof(Header) + sizeof(void*)); + const int64_t minFree = state_.freeBytes() - + state_.numFree() * (kHeaderSize + Header::kContinuedPtrSize); VELOX_CHECK_GE(minFree, 0, "Guaranteed free space cannot be negative"); return minFree; } @@ -358,6 +367,7 @@ class HashStringAllocator : public StreamArena { static constexpr int32_t kUnitSize = 16 * memory::AllocationTraits::kPageSize; static constexpr int32_t kMinContiguous = 48; static constexpr int32_t kNumFreeLists = kMaxAlloc - kMinAlloc + 2; + static constexpr uint32_t kHeaderSize = sizeof(Header); void newRange( int32_t bytes, diff --git a/velox/common/memory/tests/HashStringAllocatorTest.cpp b/velox/common/memory/tests/HashStringAllocatorTest.cpp index 608d1a83fe846..ebff6583f1bc1 100644 --- a/velox/common/memory/tests/HashStringAllocatorTest.cpp +++ b/velox/common/memory/tests/HashStringAllocatorTest.cpp @@ -55,7 +55,8 @@ class HashStringAllocatorTest : public testing::Test { void initializeContents(HSA::Header* header) { auto sequence = ++sequence_; - int32_t numWords = header->size() / sizeof(void*); + int32_t numWords = + header->size() / HashStringAllocator::Header::kContinuedPtrSize; void** ptr = reinterpret_cast(header->begin()); ptr[0] = reinterpret_cast(sequence); for (int32_t offset = 1; offset < numWords; offset++) { @@ -99,6 +100,48 @@ class HashStringAllocatorTest : public testing::Test { folly::Random::DefaultGenerator rng_; }; +TEST_F(HashStringAllocatorTest, multipleFree) { + ASSERT_NO_THROW(allocator_->toString()); + + auto h1 = allocate(123); + ASSERT_EQ(h1->toString(), "size: 123"); + + allocator_->free(h1); + // Running free() multiple times on the same memory block should result in an + // error. + VELOX_ASSERT_THROW(allocator_->free(h1), ""); +} + +TEST_F(HashStringAllocatorTest, multipleFreeAncCheckCumulativeBytes) { + ASSERT_NO_THROW(allocator_->toString()); + + auto h1 = allocate(123); + auto h2 = allocate(456); + auto h3 = allocate(789); + + ASSERT_EQ(h1->toString(), "size: 123"); + ASSERT_EQ(h2->toString(), "size: 456"); + ASSERT_EQ(h3->toString(), "size: 789"); + + auto allocatedBytes = allocator_->checkConsistency(); + ASSERT_EQ(allocatedBytes, allocator_->cumulativeBytes()); + + allocator_->free(h3); + allocatedBytes = allocator_->checkConsistency(); + ASSERT_EQ(allocatedBytes, allocator_->cumulativeBytes()); + + allocator_->free(h2); + allocatedBytes = allocator_->checkConsistency(); + ASSERT_EQ(allocatedBytes, allocator_->cumulativeBytes()); + + allocator_->free(h1); + allocatedBytes = allocator_->checkConsistency(); + ASSERT_EQ(allocatedBytes, allocator_->cumulativeBytes()); + + // After all blocks execute free(), the allocated bytes should be equal to 0. + ASSERT_EQ(allocator_->cumulativeBytes(), 0); +} + TEST_F(HashStringAllocatorTest, headerToString) { ASSERT_NO_THROW(allocator_->toString()); @@ -141,7 +184,8 @@ TEST_F(HashStringAllocatorTest, allocate) { headers.push_back(allocate((i % 10) * 10)); } EXPECT_FALSE(allocator_->isEmpty()); - allocator_->checkConsistency(); + auto allocatedBytes = allocator_->checkConsistency(); + ASSERT_EQ(allocatedBytes, allocator_->cumulativeBytes()); for (int32_t step = 7; step >= 1; --step) { for (auto i = 0; i < headers.size(); i += step) { if (headers[i]) { @@ -149,7 +193,8 @@ TEST_F(HashStringAllocatorTest, allocate) { headers[i] = nullptr; } } - allocator_->checkConsistency(); + allocatedBytes = allocator_->checkConsistency(); + ASSERT_EQ(allocatedBytes, allocator_->cumulativeBytes()); } } EXPECT_TRUE(allocator_->isEmpty()); @@ -220,7 +265,8 @@ TEST_F(HashStringAllocatorTest, finishWrite) { inputStream.readBytes(copy.data(), 4); ASSERT_EQ(copy, "abcd"); - allocator_->checkConsistency(); + auto allocatedBytes = allocator_->checkConsistency(); + ASSERT_EQ(allocatedBytes, allocator_->cumulativeBytes()); std::vector sizes = { 50000, 100000, 200000, 1000000, 3000000, 5000000}; @@ -236,7 +282,8 @@ TEST_F(HashStringAllocatorTest, finishWrite) { copy.resize(largeString.size()); inStream.readBytes(copy.data(), copy.size()); ASSERT_EQ(copy, largeString); - allocator_->checkConsistency(); + allocatedBytes = allocator_->checkConsistency(); + ASSERT_EQ(allocatedBytes, allocator_->cumulativeBytes()); } } @@ -273,7 +320,8 @@ TEST_F(HashStringAllocatorTest, multipart) { data[i].reference.insert( data[i].reference.end(), chars.begin(), chars.end()); } - allocator_->checkConsistency(); + auto allocatedBytes = allocator_->checkConsistency(); + ASSERT_EQ(allocatedBytes, allocator_->cumulativeBytes()); } for (const auto& d : data) { if (d.start.isSet()) { @@ -283,10 +331,12 @@ TEST_F(HashStringAllocatorTest, multipart) { for (auto& d : data) { if (d.start.isSet()) { checkAndFree(d); - allocator_->checkConsistency(); + auto allocatedBytes = allocator_->checkConsistency(); + ASSERT_EQ(allocatedBytes, allocator_->cumulativeBytes()); } } - allocator_->checkConsistency(); + auto allocatedBytes = allocator_->checkConsistency(); + ASSERT_EQ(allocatedBytes, allocator_->cumulativeBytes()); } TEST_F(HashStringAllocatorTest, mixedMultipart) { @@ -317,7 +367,8 @@ TEST_F(HashStringAllocatorTest, mixedMultipart) { allocator_->free(start.header); - allocator_->checkConsistency(); + auto allocatedBytes = allocator_->checkConsistency(); + ASSERT_EQ(allocatedBytes, allocator_->cumulativeBytes()); } TEST_F(HashStringAllocatorTest, rewrite) { @@ -398,7 +449,8 @@ TEST_F(HashStringAllocatorTest, stlAllocator) { } } - allocator_->checkConsistency(); + auto allocatedBytes = allocator_->checkConsistency(); + ASSERT_EQ(allocatedBytes, allocator_->cumulativeBytes()); // We allow for some overhead for free lists after all is freed. EXPECT_LE(allocator_->retainedSize() - allocator_->freeSpace(), 100); @@ -433,7 +485,8 @@ TEST_F(HashStringAllocatorTest, stlAllocatorWithSet) { } } - allocator_->checkConsistency(); + auto allocatedBytes = allocator_->checkConsistency(); + ASSERT_EQ(allocatedBytes, allocator_->cumulativeBytes()); // We allow for some overhead for free lists after all is freed. EXPECT_LE(allocator_->retainedSize() - allocator_->freeSpace(), 220); @@ -470,7 +523,8 @@ TEST_F(HashStringAllocatorTest, alignedStlAllocatorWithF14Map) { } } - allocator_->checkConsistency(); + auto allocatedBytes = allocator_->checkConsistency(); + ASSERT_EQ(allocatedBytes, allocator_->cumulativeBytes()); // We allow for some overhead for free lists after all is freed. Map tends to // generate more free blocks at the end, so we loosen the upper bound a bit. @@ -484,14 +538,16 @@ TEST_F(HashStringAllocatorTest, alignedStlAllocatorLargeAllocation) { AlignedStlAllocator alignedAlloc16(allocator_.get()); int64_t* ptr = alignedAlloc16.allocate(allocateSize); alignedAlloc16.deallocate(ptr, allocateSize); - allocator_->checkConsistency(); + auto allocatedBytes = allocator_->checkConsistency(); + ASSERT_EQ(allocatedBytes, allocator_->cumulativeBytes()); // Test large allocation + un-aligned pool. ASSERT_LT(allocator_->pool()->alignment(), 128); AlignedStlAllocator alignedAlloc128(allocator_.get()); ptr = alignedAlloc128.allocate(allocateSize); alignedAlloc128.deallocate(ptr, allocateSize); - allocator_->checkConsistency(); + allocatedBytes = allocator_->checkConsistency(); + ASSERT_EQ(allocatedBytes, allocator_->cumulativeBytes()); } TEST_F(HashStringAllocatorTest, stlAllocatorOverflow) { @@ -580,7 +636,8 @@ TEST_F(HashStringAllocatorTest, strings) { views.push_back(StringView(str.data(), str.size())); allocator_->copyMultipart(views[i], reinterpret_cast(&views[i]), 0); if (i % 10 == 0) { - allocator_->checkConsistency(); + auto allocatedBytes = allocator_->checkConsistency(); + ASSERT_EQ(allocatedBytes, allocator_->cumulativeBytes()); } } for (auto i = 0; i < strings.size(); ++i) { @@ -592,7 +649,8 @@ TEST_F(HashStringAllocatorTest, strings) { StringView(strings[i]) == HashStringAllocator::contiguousString(views[i], temp)); } - allocator_->checkConsistency(); + auto allocatedBytes = allocator_->checkConsistency(); + ASSERT_EQ(allocatedBytes, allocator_->cumulativeBytes()); } TEST_F(HashStringAllocatorTest, sizeAndPosition) { @@ -662,12 +720,15 @@ TEST_F(HashStringAllocatorTest, sizeAndPosition) { TEST_F(HashStringAllocatorTest, storeStringFast) { allocator_->allocate(HashStringAllocator::kMinAlloc); - std::string s(allocator_->freeSpace() + sizeof(void*), 'x'); + std::string s( + allocator_->freeSpace() + HashStringAllocator::Header::kContinuedPtrSize, + 'x'); StringView sv(s); allocator_->copyMultipart(sv, reinterpret_cast(&sv), 0); ASSERT_NE(sv.data(), s.data()); ASSERT_EQ(sv, StringView(s)); - allocator_->checkConsistency(); + auto allocatedBytes = allocator_->checkConsistency(); + ASSERT_EQ(allocatedBytes, allocator_->cumulativeBytes()); } TEST_F(HashStringAllocatorTest, clear) {