Skip to content

Commit

Permalink
Advance velox version with memory back out
Browse files Browse the repository at this point in the history
  • Loading branch information
xiaoxmeng authored and spershin committed Dec 15, 2022
1 parent 2d8839b commit a7f0666
Show file tree
Hide file tree
Showing 12 changed files with 43 additions and 35 deletions.
19 changes: 11 additions & 8 deletions presto-native-execution/presto_cpp/main/PeriodicTaskManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@
#include "presto_cpp/main/common/Counters.h"
#include "velox/common/base/StatsReporter.h"
#include "velox/common/caching/AsyncDataCache.h"
#include "velox/common/memory/MappedMemory.h"
#include "velox/common/memory/Memory.h"
#include "velox/common/memory/MemoryAllocator.h"
#include "velox/common/memory/MmapAllocator.h"
#include "velox/exec/Driver.h"

Expand All @@ -42,7 +42,10 @@ PeriodicTaskManager::PeriodicTaskManager(
: driverCPUExecutor_(driverCPUExecutor),
httpExecutor_(httpExecutor),
taskManager_(taskManager),
memoryManager_(velox::memory::MemoryManager::getInstance()) {}
memoryManager_(
velox::memory::MemoryManager<
velox::memory::MmapMemoryAllocator>::getProcessDefaultManager()) {
}

void PeriodicTaskManager::start() {
// Add new functions here.
Expand Down Expand Up @@ -126,16 +129,16 @@ void PeriodicTaskManager::start() {
"clean_old_tasks");
}

if (auto* allocator = velox::memory::MemoryAllocator::getInstance()) {
if (auto* mappedMemory = velox::memory::MappedMemory::getInstance()) {
scheduler_.addFunction(
[allocator]() {
[mappedMemory]() {
REPORT_ADD_STAT_VALUE(
kCounterMappedMemoryBytes, (allocator->numMapped() * 4096l));
kCounterMappedMemoryBytes, (mappedMemory->numMapped() * 4096l));
REPORT_ADD_STAT_VALUE(
kCounterAllocatedMemoryBytes,
(allocator->numAllocated() * 4096l));
(mappedMemory->numAllocated() * 4096l));
auto allocBytesCounters =
velox::memory::MemoryAllocator::allocateBytesStats();
velox::memory::MappedMemory::allocateBytesStats();
REPORT_ADD_STAT_VALUE(
kCounterMappedMemoryRawAllocBytesSmall,
(allocBytesCounters.totalSmall));
Expand All @@ -151,7 +154,7 @@ void PeriodicTaskManager::start() {
}

if (auto* asyncDataCache = dynamic_cast<velox::cache::AsyncDataCache*>(
velox::memory::MemoryAllocator::getInstance())) {
velox::memory::MappedMemory::getInstance())) {
scheduler_.addFunction(
[asyncDataCache]() {
velox::cache::CacheStats memoryCacheStats =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ class PeriodicTaskManager {
folly::CPUThreadPoolExecutor* driverCPUExecutor_;
folly::IOThreadPoolExecutor* httpExecutor_;
TaskManager* taskManager_;
velox::memory::MemoryManager& memoryManager_;
velox::memory::MemoryManager<velox::memory::MmapMemoryAllocator>&
memoryManager_;
};

} // namespace facebook::presto
Original file line number Diff line number Diff line change
Expand Up @@ -157,12 +157,12 @@ void PrestoExchangeSource::processDataResponse(
page = std::make_unique<exec::SerializedPage>(
std::move(singleChain),
pool_,
[allocator = response->allocator()](folly::IOBuf& iobuf) {
[mappedMemory = response->mappedMemory()](folly::IOBuf& iobuf) {
// Free the backed memory from MemoryAllocator on page dtor.
folly::IOBuf* start = &iobuf;
auto curr = start;
do {
allocator->freeBytes(curr->writableData(), curr->length());
mappedMemory->freeBytes(curr->writableData(), curr->length());
curr = curr->next();
} while (curr != start);
});
Expand Down
7 changes: 4 additions & 3 deletions presto-native-execution/presto_cpp/main/PrestoServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -359,9 +359,9 @@ void PrestoServer::initializeAsyncCache() {
options.mmapArenaCapacityRatio = systemConfig->mmapArenaCapacityRatio();

auto allocator = std::make_shared<memory::MmapAllocator>(options);
cache_ = std::make_shared<cache::AsyncDataCache>(
mappedMemory_ = std::make_shared<cache::AsyncDataCache>(
allocator, memoryBytes, std::move(ssd));
memory::MemoryAllocator::setDefaultInstance(cache_.get());
memory::MappedMemory::setDefaultInstance(mappedMemory_.get());
}

void PrestoServer::stop() {
Expand Down Expand Up @@ -487,7 +487,8 @@ std::shared_ptr<velox::connector::Connector> PrestoServer::connectorWithCache(
const std::string& connectorName,
const std::string& catalogName,
std::shared_ptr<const velox::Config> properties) {
VELOX_CHECK_NOT_NULL(cache_);
VELOX_CHECK_NOT_NULL(
dynamic_cast<cache::AsyncDataCache*>(mappedMemory_.get()));
LOG(INFO) << "STARTUP: Using AsyncDataCache";
return facebook::velox::connector::getConnectorFactory(connectorName)
->newConnector(
Expand Down
4 changes: 2 additions & 2 deletions presto-native-execution/presto_cpp/main/PrestoServer.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
#include <velox/expression/Expr.h>
#include "presto_cpp/main/CPUMon.h"
#include "velox/common/caching/AsyncDataCache.h"
#include "velox/common/memory/MemoryAllocator.h"
#include "velox/common/memory/MappedMemory.h"
#if __has_include("filesystem")
#include <filesystem>
namespace fs = std::filesystem;
Expand Down Expand Up @@ -115,7 +115,7 @@ class PrestoServer {
std::unique_ptr<folly::IOThreadPoolExecutor> connectorIoExecutor_;

// Instance of AsyncDataCache used for all large allocations.
std::shared_ptr<velox::cache::AsyncDataCache> cache_;
std::shared_ptr<velox::memory::MappedMemory> mappedMemory_;

std::unique_ptr<http::HttpServer> httpServer_;
std::unique_ptr<SignalHandler> signalHandler_;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ std::shared_ptr<core::QueryCtx> QueryContextManager::findOrCreateQueryCtx(
executor().get(),
config,
connectorConfigs,
memory::MemoryAllocator::getInstance(),
memory::MappedMemory::getInstance(),
std::move(pool),
spillExecutor(),
queryId);
Expand Down
4 changes: 2 additions & 2 deletions presto-native-execution/presto_cpp/main/http/HttpClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ HttpResponse::~HttpResponse() {
// Clear out any leftover iobufs if not consumed.
for (auto& buf : bodyChain_) {
if (buf) {
allocator_->freeBytes(buf->writableData(), buf->length());
mappedMemory_->freeBytes(buf->writableData(), buf->length());
}
}
}
Expand All @@ -50,7 +50,7 @@ void HttpResponse::append(std::unique_ptr<folly::IOBuf>&& iobuf) {
VELOX_CHECK(!iobuf->isChained());
uint64_t dataLength = iobuf->length();

void* buf = allocator_->allocateBytes(dataLength);
void* buf = mappedMemory_->allocateBytes(dataLength);
if (buf == nullptr) {
VELOX_FAIL(
"Cannot spare enough system memory to receive more HTTP response.");
Expand Down
10 changes: 5 additions & 5 deletions presto-native-execution/presto_cpp/main/http/HttpClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
#include <proxygen/lib/http/HTTPConnector.h>
#include <proxygen/lib/http/connpool/SessionPool.h>
#include <proxygen/lib/http/session/HTTPUpstreamSession.h>
#include <velox/common/memory/MemoryAllocator.h>
#include <velox/common/memory/MappedMemory.h>
#include "presto_cpp/main/http/HttpConstants.h"
#include "velox/common/base/Exceptions.h"

Expand All @@ -26,7 +26,7 @@ class HttpResponse {
public:
explicit HttpResponse(std::unique_ptr<proxygen::HTTPMessage> headers)
: headers_(std::move(headers)),
allocator_(velox::memory::MemoryAllocator::getInstance()) {}
mappedMemory_(velox::memory::MappedMemory::getInstance()) {}

~HttpResponse();

Expand All @@ -49,15 +49,15 @@ class HttpResponse {
return std::move(bodyChain_);
}

velox::memory::MemoryAllocator* FOLLY_NONNULL allocator() {
return allocator_;
velox::memory::MappedMemory* FOLLY_NONNULL mappedMemory() {
return mappedMemory_;
}

std::string dumpBodyChain() const;

private:
const std::unique_ptr<proxygen::HTTPMessage> headers_;
velox::memory::MemoryAllocator* FOLLY_NONNULL const allocator_;
velox::memory::MappedMemory* FOLLY_NONNULL const mappedMemory_;

std::vector<std::unique_ptr<folly::IOBuf>> bodyChain_;
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,9 @@ std::string bodyAsString(http::HttpResponse& response) {
auto iobufs = response.consumeBody();
for (auto& body : iobufs) {
oss << std::string((const char*)body->data(), body->length());
response.allocator()->freeBytes(body->writableData(), body->length());
response.mappedMemory()->freeBytes(body->writableData(), body->length());
}
EXPECT_EQ(response.allocator()->numAllocated(), 0);
EXPECT_EQ(response.mappedMemory()->numAllocated(), 0);
return oss.str();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
#include <folly/init/Init.h>
#include <gtest/gtest.h>

#include <velox/common/memory/MemoryAllocator.h>
#include <velox/common/memory/MappedMemory.h>
#include "presto_cpp/main/PrestoExchangeSource.h"
#include "presto_cpp/main/http/HttpClient.h"
#include "presto_cpp/main/http/HttpServer.h"
Expand Down Expand Up @@ -294,18 +294,21 @@ folly::Uri makeProducerUri(const folly::SocketAddress& address) {
class PrestoExchangeSourceTest : public testing::Test {
public:
void SetUp() override {
auto& defaultManager = memory::MemoryManager::getInstance();
auto& defaultManager =
memory::MemoryManager<memory::MemoryAllocator, memory::kNoAlignment>::
getProcessDefaultManager();
auto& pool =
dynamic_cast<memory::MemoryPoolImpl&>(defaultManager.getRoot());
dynamic_cast<memory::MemoryPoolImpl<memory::MemoryAllocator, 16>&>(
defaultManager.getRoot());
pool_ = &pool;
memory::MmapAllocatorOptions options;
options.capacity = 1L << 30;
allocator_ = std::make_unique<memory::MmapAllocator>(options);
memory::MemoryAllocator::setDefaultInstance(allocator_.get());
mappedMemory_ = std::make_unique<memory::MmapAllocator>(options);
memory::MappedMemory::setDefaultInstance(mappedMemory_.get());
}

void TearDown() override {
memory::MemoryAllocator::setDefaultInstance(nullptr);
memory::MappedMemory::setDefaultInstance(nullptr);
}

void requestNextPage(
Expand All @@ -319,7 +322,7 @@ class PrestoExchangeSourceTest : public testing::Test {
}

memory::MemoryPool* pool_;
std::unique_ptr<memory::MemoryAllocator> allocator_;
std::unique_ptr<memory::MappedMemory> mappedMemory_;
};

TEST_F(PrestoExchangeSourceTest, basic) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ class Cursor {
}

memory::MemoryPool* pool_;
memory::MemoryAllocator* allocator_ = memory::MemoryAllocator::getInstance();
memory::MappedMemory* mappedMemory_ = memory::MappedMemory::getInstance();
TaskManager* taskManager_;
const protocol::TaskId taskId_;
RowTypePtr rowType_;
Expand Down
2 changes: 1 addition & 1 deletion presto-native-execution/velox
Submodule velox updated 90 files
+2 −2 velox/benchmarks/tpch/TpchBenchmark.cpp
+2 −1 velox/buffer/tests/BufferTest.cpp
+32 −44 velox/common/caching/AsyncDataCache.cpp
+26 −29 velox/common/caching/AsyncDataCache.h
+1 −1 velox/common/caching/SsdFile.cpp
+1 −1 velox/common/caching/SsdFile.h
+35 −41 velox/common/caching/tests/AsyncDataCacheTest.cpp
+8 −8 velox/common/caching/tests/SsdFileTest.cpp
+1 −2 velox/common/hyperloglog/tests/DenseHllTest.cpp
+2 −4 velox/common/hyperloglog/tests/SparseHllTest.cpp
+21 −18 velox/common/memory/AllocationPool.cpp
+21 −19 velox/common/memory/AllocationPool.h
+5 −5 velox/common/memory/ByteStream.h
+1 −1 velox/common/memory/CMakeLists.txt
+1 −2 velox/common/memory/HashStringAllocator.cpp
+6 −6 velox/common/memory/HashStringAllocator.h
+555 −0 velox/common/memory/MappedMemory.cpp
+239 −230 velox/common/memory/MappedMemory.h
+105 −371 velox/common/memory/Memory.cpp
+571 −193 velox/common/memory/Memory.h
+0 −584 velox/common/memory/MemoryAllocator.cpp
+139 −226 velox/common/memory/MmapAllocator.cpp
+66 −68 velox/common/memory/MmapAllocator.h
+45 −50 velox/common/memory/MmapArena.cpp
+14 −16 velox/common/memory/MmapArena.h
+15 −13 velox/common/memory/StreamArena.cpp
+9 −10 velox/common/memory/StreamArena.h
+9 −14 velox/common/memory/tests/ByteStreamTest.cpp
+2 −2 velox/common/memory/tests/CMakeLists.txt
+6 −12 velox/common/memory/tests/FragmentationBenchmark.cpp
+2 −3 velox/common/memory/tests/HashStringAllocatorTest.cpp
+923 −0 velox/common/memory/tests/MappedMemoryTest.cpp
+0 −1,498 velox/common/memory/tests/MemoryAllocatorTest.cpp
+14 −59 velox/common/memory/tests/MemoryManagerTest.cpp
+16 −17 velox/common/memory/tests/MemoryPoolBenchmark.cpp
+76 −423 velox/common/memory/tests/MemoryPoolTest.cpp
+10 −10 velox/connectors/Connector.h
+3 −3 velox/connectors/hive/HiveConnector.cpp
+3 −3 velox/connectors/hive/HiveConnector.h
+1 −1 velox/connectors/hive/tests/HiveWriteProtocolTest.cpp
+10 −10 velox/core/QueryCtx.h
+3 −3 velox/dwio/common/CacheInputStream.cpp
+7 −6 velox/dwio/common/CachedBufferedInput.cpp
+1 −1 velox/dwio/common/DataBuffer.h
+2 −3 velox/dwio/dwrf/test/CacheInputTest.cpp
+9 −45 velox/dwio/dwrf/test/WriterFlushTest.cpp
+2 −2 velox/exec/Exchange.h
+8 −7 velox/exec/GroupingSet.cpp
+2 −1 velox/exec/GroupingSet.h
+6 −5 velox/exec/HashBuild.cpp
+3 −0 velox/exec/HashBuild.h
+8 −6 velox/exec/HashTable.cpp
+7 −7 velox/exec/HashTable.h
+4 −0 velox/exec/Merge.h
+9 −1 velox/exec/Operator.cpp
+3 −0 velox/exec/Operator.h
+5 −3 velox/exec/OrderBy.cpp
+2 −0 velox/exec/OrderBy.h
+6 −4 velox/exec/PartitionedOutput.cpp
+4 −3 velox/exec/PartitionedOutput.h
+13 −10 velox/exec/RowContainer.cpp
+14 −15 velox/exec/RowContainer.h
+4 −3 velox/exec/Spill.cpp
+9 −3 velox/exec/Spill.h
+8 −8 velox/exec/Spiller.cpp
+5 −5 velox/exec/Spiller.h
+1 −1 velox/exec/StreamingAggregation.cpp
+8 −1 velox/exec/Task.cpp
+10 −0 velox/exec/Task.h
+3 −1 velox/exec/TopN.cpp
+2 −2 velox/exec/Window.cpp
+3 −3 velox/exec/tests/AggregationTest.cpp
+2 −2 velox/exec/tests/HashJoinBridgeTest.cpp
+5 −4 velox/exec/tests/HashTableTest.cpp
+6 −6 velox/exec/tests/PartitionedOutputBufferManagerTest.cpp
+2 −2 velox/exec/tests/RowContainerTest.cpp
+11 −4 velox/exec/tests/SpillTest.cpp
+2 −2 velox/exec/tests/TableScanTest.cpp
+2 −2 velox/exec/tests/utils/HiveConnectorTestBase.h
+4 −4 velox/exec/tests/utils/OperatorTestBase.cpp
+3 −1 velox/exec/tests/utils/RowContainerTestBase.h
+1 −2 velox/functions/lib/tests/KllSketchTest.cpp
+2 −2 velox/functions/prestosql/aggregates/tests/ValueListTest.cpp
+1 −2 velox/functions/prestosql/tests/HyperLogLogFunctionsTest.cpp
+4 −4 velox/serializers/UnsafeRowSerializer.cpp
+4 −2 velox/serializers/tests/PrestoSerializerTest.cpp
+2 −1 velox/serializers/tests/UnsafeRowSerializerTest.cpp
+18 −18 velox/vector/VectorStream.h
+5 −2 velox/vector/tests/VectorTest.cpp
+2 −1 velox/vector/tests/VectorUtilTest.cpp

0 comments on commit a7f0666

Please sign in to comment.