Skip to content
Permalink

Comparing changes

This is a direct comparison between two commits made in this repository or its related repositories. View the default comparison for this range or learn more about diff comparisons.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also . Learn more about diff comparisons here.
base repository: facebookincubator/velox
Failed to load repositories. Confirm that selected base ref is valid, then try again.
Loading
base: e1c2fc034ff3c73e5c629b94e8a728c16709ffe7
Choose a base ref
..
head repository: facebookincubator/velox
Failed to load repositories. Confirm that selected head ref is valid, then try again.
Loading
compare: 0b3a8e8df6ab6b26d100ec00a3b4526506c39ffa
Choose a head ref
2 changes: 1 addition & 1 deletion build/deps/github_hashes/facebook/folly-rev.txt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
Subproject commit 350d393db10ccf685547bf17824f8faffe3e83d4
Subproject commit 6b5e002deb49c3c9bdcb5a4568bc3265c6dff2c9
52 changes: 32 additions & 20 deletions velox/common/memory/Memory.cpp
Original file line number Diff line number Diff line change
@@ -43,12 +43,19 @@ namespace {
::facebook::velox::error_code::kMemCapExceeded.c_str(), \
/* isRetriable */ true, \
"Memory allocation manually capped");

constexpr folly::StringPiece kRootNodeName{"__root__"};
} // namespace

MemoryPool::MemoryPool(
const std::string& name,
std::shared_ptr<MemoryPool> parent)
: name_(name), parent_(std::move(parent)) {}
std::shared_ptr<MemoryPool> parent,
uint16_t alignment)
: name_(name),
alignment_{alignment == 0 ? MemoryAllocator::kMinAlignment : alignment},
parent_(std::move(parent)) {
MemoryAllocator::validateAlignment(alignment_);
}

MemoryPool::~MemoryPool() {
VELOX_CHECK(children_.empty());
@@ -128,16 +135,15 @@ MemoryPoolImpl::MemoryPoolImpl(
MemoryManager& memoryManager,
const std::string& name,
std::shared_ptr<MemoryPool> parent,
int64_t cap)
: MemoryPool{name, parent},
const Options& options)
: MemoryPool{name, parent, options.alignment},
cap_{options.capacity},
memoryManager_{memoryManager},
localMemoryUsage_{},
cap_{cap},
allocator_{memoryManager_.getAllocator()} {
VELOX_USER_CHECK_GT(cap, 0);
VELOX_USER_CHECK_GT(cap_, 0);
}

/* static */
int64_t MemoryPoolImpl::sizeAlign(int64_t size) {
const auto remainder = size % alignment_;
return (remainder == 0) ? size : (size + alignment_ - remainder);
@@ -304,7 +310,7 @@ int64_t MemoryPoolImpl::cap() const {
return cap_;
}

uint16_t MemoryPoolImpl::getAlignment() const {
uint16_t MemoryPoolImpl::alignment() const {
return alignment_;
}

@@ -338,7 +344,8 @@ std::shared_ptr<MemoryPool> MemoryPoolImpl::genChild(
std::shared_ptr<MemoryPool> parent,
const std::string& name,
int64_t cap) {
return std::make_shared<MemoryPoolImpl>(memoryManager_, name, parent, cap);
return std::make_shared<MemoryPoolImpl>(
memoryManager_, name, parent, Options{alignment_, cap});
}

const MemoryUsage& MemoryPoolImpl::getLocalMemoryUsage() const {
@@ -389,7 +396,7 @@ void MemoryPoolImpl::reserve(int64_t size) {
// more conservative side.
release(size);
if (!success) {
VELOX_MEM_MANAGER_CAP_EXCEEDED(memoryManager_.getMemoryQuota());
VELOX_MEM_MANAGER_CAP_EXCEEDED(memoryManager_.capacity());
}
if (manualCap) {
VELOX_MEM_MANUAL_CAP();
@@ -410,17 +417,18 @@ void MemoryPoolImpl::release(int64_t size, bool mock) {
}
}

MemoryManager::MemoryManager(
int64_t memoryQuota,
MemoryAllocator* FOLLY_NONNULL allocator)
: allocator_{std::move(allocator)},
memoryQuota_{memoryQuota},
MemoryManager::MemoryManager(const Options& options)
: allocator_{options.allocator},
alignment_(options.alignment),
capacity_{options.capacity},
root_{std::make_shared<MemoryPoolImpl>(
*this,
kRootNodeName.str(),
nullptr,
memoryQuota)} {
VELOX_USER_CHECK_GE(memoryQuota_, 0);
MemoryPool::Options{alignment_, capacity_})} {
VELOX_CHECK_NOT_NULL(allocator_);
VELOX_USER_CHECK_GE(capacity_, 0);
MemoryAllocator::validateAlignment(alignment_);
}

MemoryManager::~MemoryManager() {
@@ -430,8 +438,12 @@ MemoryManager::~MemoryManager() {
}
}

int64_t MemoryManager::getMemoryQuota() const {
return memoryQuota_;
int64_t MemoryManager::capacity() const {
return capacity_;
}

uint16_t MemoryManager::alignment() const {
return alignment_;
}

MemoryPool& MemoryManager::getRoot() const {
@@ -452,7 +464,7 @@ int64_t MemoryManager::getTotalBytes() const {

bool MemoryManager::reserve(int64_t size) {
return totalBytes_.fetch_add(size, std::memory_order_relaxed) + size <=
memoryQuota_;
capacity_;
}

void MemoryManager::release(int64_t size) {
84 changes: 54 additions & 30 deletions velox/common/memory/Memory.h
Original file line number Diff line number Diff line change
@@ -95,8 +95,18 @@ namespace memory {
/// be merged into memory pool object later.
class MemoryPool : public std::enable_shared_from_this<MemoryPool> {
public:
struct Options {
/// Specifies the memory allocation alignment through this memory pool.
uint16_t alignment{MemoryAllocator::kMaxAlignment};
/// Specifies the memory capacity of this memory pool.
int64_t capacity{kMaxMemory};
};

/// Constructs a named memory pool with specified 'parent'.
MemoryPool(const std::string& name, std::shared_ptr<MemoryPool> parent);
MemoryPool(
const std::string& name,
std::shared_ptr<MemoryPool> parent,
uint16_t alignment);

/// Removes this memory pool's tracking from its parent through dropChild().
/// Drops the shared reference to its parent.
@@ -184,7 +194,9 @@ class MemoryPool : public std::enable_shared_from_this<MemoryPool> {

/// Returns the memory allocation alignment size applied internally by this
/// memory pool object.
virtual uint16_t getAlignment() const = 0;
virtual uint16_t alignment() const {
return alignment_;
}

/// Resource governing methods used to track and limit the memory usage
/// through this memory pool object.
@@ -253,7 +265,8 @@ class MemoryPool : public std::enable_shared_from_this<MemoryPool> {
virtual void dropChild(const MemoryPool* FOLLY_NONNULL child);

const std::string name_;
std::shared_ptr<MemoryPool> parent_;
const uint16_t alignment_;
const std::shared_ptr<MemoryPool> parent_;

/// Protects 'children_'.
mutable folly::SharedMutex childrenMutex_;
@@ -271,7 +284,7 @@ class MemoryPoolImpl : public MemoryPool {
MemoryManager& memoryManager,
const std::string& name,
std::shared_ptr<MemoryPool> parent,
int64_t cap = kMaxMemory);
const Options& options = Options{});

~MemoryPoolImpl() {
if (const auto& tracker = getMemoryUsageTracker()) {
@@ -339,7 +352,7 @@ class MemoryPoolImpl : public MemoryPool {

int64_t cap() const override;

uint16_t getAlignment() const override;
uint16_t alignment() const override;

void capMemoryAllocation() override;

@@ -371,7 +384,7 @@ class MemoryPoolImpl : public MemoryPool {
private:
VELOX_FRIEND_TEST(MemoryPoolTest, Ctor);

static int64_t sizeAlign(int64_t size);
int64_t sizeAlign(int64_t size);

void* FOLLY_NULLABLE
reallocAligned(void* FOLLY_NULLABLE p, int64_t size, int64_t newSize) {
@@ -382,33 +395,44 @@ class MemoryPoolImpl : public MemoryPool {
std::function<void(const MemoryUsage&)> visitor) const;
void updateSubtreeMemoryUsage(std::function<void(MemoryUsage&)> visitor);

static constexpr uint16_t alignment_{MemoryAllocator::kMaxAlignment};
const int64_t cap_;
MemoryManager& memoryManager_;

// Memory allocated attributed to the memory node.
MemoryUsage localMemoryUsage_;
std::shared_ptr<MemoryUsageTracker> memoryUsageTracker_;
mutable folly::SharedMutex subtreeUsageMutex_;
MemoryUsage subtreeMemoryUsage_;
int64_t cap_;
std::atomic_bool capped_{false};

MemoryAllocator& allocator_;
};

constexpr folly::StringPiece kRootNodeName{"__root__"};

/// This class provides the interface of memory manager. The memory manager is
/// responsible for enforcing the memory usage quota as well as managing the
/// memory pools.
class IMemoryManager {
public:
virtual ~IMemoryManager() {}
struct Options {
/// Specifies the default memory allocation alignment.
uint16_t alignment{MemoryAllocator::kMaxAlignment};

/// Returns the total memory usage allowed under this memory manager.
/// The memory manager maintains this quota as a hard cap, and any allocation
/// that would exceed the quota throws.
virtual int64_t getMemoryQuota() const = 0;
/// Specifies the max memory capacity in bytes.
int64_t capacity{kMaxMemory};

/// Specifies the backing memory allocator.
MemoryAllocator* FOLLY_NONNULL allocator{MemoryAllocator::getInstance()};
};

virtual ~IMemoryManager() = default;

/// Returns the total memory usage in bytes allowed under this memory manager.
/// The memory manager maintains this capacity as a hard cap, and any
/// allocation that would exceed the quota throws.
virtual int64_t capacity() const = 0;

/// Returns the memory allocation alignment of this memory manager.
virtual uint16_t alignment() const = 0;

/// Power users that want to explicitly modify the tree should get the root of
/// the tree.
@@ -439,29 +463,28 @@ class IMemoryManager {
class MemoryManager final : public IMemoryManager {
public:
/// Tries to get the singleton memory manager. If not previously initialized,
/// the process singleton manager will be initialized with the given quota.
/// the process singleton manager will be initialized with the 'options'.
FOLLY_EXPORT static MemoryManager& getInstance(
int64_t quota = kMaxMemory,
bool ensureQuota = false) {
static MemoryManager manager{quota};
auto actualQuota = manager.getMemoryQuota();
const Options& options = Options{},
bool ensureCapacity = false) {
static MemoryManager manager{options};
auto actualCapacity = manager.capacity();
VELOX_USER_CHECK(
!ensureQuota || actualQuota == quota,
"Process level manager manager created with input_quota: {}, current_quota: {}",
quota,
actualQuota);
!ensureCapacity || actualCapacity == options.capacity,
"Process level manager manager created with input capacity: {}, actual capacity: {}",
options.capacity,
actualCapacity);

return manager;
}

explicit MemoryManager(
int64_t memoryQuota = kMaxMemory,
MemoryAllocator* FOLLY_NONNULL allocator =
MemoryAllocator::getInstance());
explicit MemoryManager(const Options& options = Options{});

~MemoryManager();

int64_t getMemoryQuota() const final;
int64_t capacity() const final;

uint16_t alignment() const final;

MemoryPool& getRoot() const final;

@@ -483,7 +506,8 @@ class MemoryManager final : public IMemoryManager {
VELOX_FRIEND_TEST(MultiThreadingUncappingTest, SimpleTree);

MemoryAllocator* const FOLLY_NONNULL allocator_;
const int64_t memoryQuota_;
const uint16_t alignment_;
const int64_t capacity_;

std::shared_ptr<MemoryPool> root_;
mutable folly::SharedMutex mutex_;
10 changes: 10 additions & 0 deletions velox/common/memory/MemoryAllocator.cpp
Original file line number Diff line number Diff line change
@@ -28,6 +28,16 @@
using facebook::velox::common::testutil::TestValue;

namespace facebook::velox::memory {
/*static*/
void MemoryAllocator::validateAlignment(uint16_t alignment) {
if (alignment == 0) {
return;
}
VELOX_CHECK_LE(MemoryAllocator::kMinAlignment, alignment);
VELOX_CHECK_GE(MemoryAllocator::kMaxAlignment, alignment);
VELOX_CHECK_EQ(alignment & (alignment - 1), 0);
}

MemoryAllocator::Allocation::~Allocation() {
if (pool_ != nullptr) {
pool_->freeNonContiguous(*this);
2 changes: 2 additions & 0 deletions velox/common/memory/MemoryAllocator.h
Original file line number Diff line number Diff line change
@@ -185,6 +185,8 @@ class MemoryAllocator : std::enable_shared_from_this<MemoryAllocator> {
static constexpr uint16_t kMinAlignment = 8;
static constexpr uint16_t kMaxAlignment = 64;

static void validateAlignment(uint16_t alignment);

/// Represents a number of consecutive pages of kPageSize bytes.
class PageRun {
public:
11 changes: 6 additions & 5 deletions velox/common/memory/tests/ByteStreamTest.cpp
Original file line number Diff line number Diff line change
@@ -27,12 +27,13 @@ class ByteStreamTest : public testing::TestWithParam<bool> {
protected:
void SetUp() override {
constexpr uint64_t kMaxMappedMemory = 64 << 20;
MmapAllocatorOptions options;
options.capacity = kMaxMappedMemory;
mmapAllocator_ = std::make_shared<MmapAllocator>(options);
MmapAllocatorOptions allocatorOptions;
allocatorOptions.capacity = kMaxMappedMemory;
mmapAllocator_ = std::make_shared<MmapAllocator>(allocatorOptions);
MemoryAllocator::setDefaultInstance(mmapAllocator_.get());
memoryManager_ = std::make_unique<MemoryManager>(
kMaxMemory, MemoryAllocator::getInstance());
IMemoryManager::Options mgrOptions;
mgrOptions.capacity = kMaxMemory;
memoryManager_ = std::make_unique<MemoryManager>(mgrOptions);
pool_ = memoryManager_->getChild();
}

5 changes: 4 additions & 1 deletion velox/common/memory/tests/MemoryAllocatorTest.cpp
Original file line number Diff line number Diff line change
@@ -187,7 +187,10 @@ class MemoryAllocatorTest : public testing::TestWithParam<TestParam> {
MemoryAllocator::setDefaultInstance(mockAllocator_.get());
}
instance_ = MemoryAllocator::getInstance();
memoryManager_ = std::make_unique<MemoryManager>(kMaxMemory, instance_);
IMemoryManager::Options options;
options.capacity = kMaxMemory;
options.allocator = instance_;
memoryManager_ = std::make_unique<MemoryManager>(options);
pool_ = memoryManager_->getChild();
}

63 changes: 55 additions & 8 deletions velox/common/memory/tests/MemoryManagerTest.cpp
Original file line number Diff line number Diff line change
@@ -33,20 +33,22 @@ TEST(MemoryManagerTest, Ctor) {
ASSERT_EQ(std::numeric_limits<int64_t>::max(), root.cap());
ASSERT_EQ(0, root.getChildCount());
ASSERT_EQ(0, root.getCurrentBytes());
ASSERT_EQ(std::numeric_limits<int64_t>::max(), manager.getMemoryQuota());
ASSERT_EQ(std::numeric_limits<int64_t>::max(), manager.capacity());
ASSERT_EQ(0, manager.getTotalBytes());
}
{
MemoryManager manager{8L * 1024 * 1024};
IMemoryManager::Options options;
options.capacity = 8UL * 1024 * 1024;
MemoryManager manager{options};
const auto& root = manager.getRoot();

ASSERT_EQ(8L * 1024 * 1024, root.cap());
ASSERT_EQ(0, root.getChildCount());
ASSERT_EQ(0, root.getCurrentBytes());
ASSERT_EQ(8L * 1024 * 1024, manager.getMemoryQuota());
ASSERT_EQ(8L * 1024 * 1024, manager.capacity());
ASSERT_EQ(0, manager.getTotalBytes());
{ ASSERT_ANY_THROW(MemoryManager manager{{.capacity = -1}}); }
}
{ ASSERT_ANY_THROW(MemoryManager manager{-1}); }
}

// TODO: when run sequentially, e.g. `buck run dwio/memory/...`, this has side
@@ -86,7 +88,9 @@ TEST(MemoryManagerTest, Reserve) {
ASSERT_EQ(0, manager.getTotalBytes());
}
{
MemoryManager manager{42};
IMemoryManager::Options options;
options.capacity = 42;
MemoryManager manager{options};
ASSERT_TRUE(manager.reserve(1));
ASSERT_TRUE(manager.reserve(1));
ASSERT_TRUE(manager.reserve(2));
@@ -108,10 +112,53 @@ TEST(MemoryManagerTest, Reserve) {

TEST(MemoryManagerTest, GlobalMemoryManagerQuota) {
auto& manager = MemoryManager::getInstance();
ASSERT_THROW(MemoryManager::getInstance(42, true), velox::VeloxUserError);
IMemoryManager::Options options;
options.capacity = 42;
ASSERT_THROW(
MemoryManager::getInstance(options, true), velox::VeloxUserError);

auto& coercedManager = MemoryManager::getInstance(42);
ASSERT_EQ(manager.getMemoryQuota(), coercedManager.getMemoryQuota());
auto& coercedManager = MemoryManager::getInstance(options);
ASSERT_EQ(manager.capacity(), coercedManager.capacity());
}

TEST(MemoryManagerTest, memoryAlignmentOptionCheck) {
struct {
uint16_t alignment;
bool expectedSuccess;

std::string debugString() const {
return fmt::format(
"alignment:{}, expectedSuccess:{}", alignment, expectedSuccess);
}
} testSettings[] = {
{0, true},
{MemoryAllocator::kMinAlignment - 1, false},
{MemoryAllocator::kMinAlignment, true},
{MemoryAllocator::kMinAlignment * 2, true},
{MemoryAllocator::kMinAlignment + 1, false},
{MemoryAllocator::kMaxAlignment - 1, false},
{MemoryAllocator::kMaxAlignment, true},
{MemoryAllocator::kMaxAlignment + 1, false},
{MemoryAllocator::kMaxAlignment * 2, false}};
for (const auto& testData : testSettings) {
SCOPED_TRACE(testData.debugString());
IMemoryManager::Options options;
options.alignment = testData.alignment;
if (!testData.expectedSuccess) {
ASSERT_THROW(MemoryManager{options}, VeloxRuntimeError);
continue;
}
MemoryManager manager{options};
ASSERT_EQ(manager.alignment(), testData.alignment);
ASSERT_EQ(
manager.getRoot().alignment(),
testData.alignment == 0 ? MemoryAllocator::kMinAlignment
: testData.alignment);
ASSERT_EQ(
manager.getChild()->alignment(),
testData.alignment == 0 ? MemoryAllocator::kMinAlignment
: testData.alignment);
}
}
} // namespace memory
} // namespace velox
97 changes: 77 additions & 20 deletions velox/common/memory/tests/MemoryPoolTest.cpp
Original file line number Diff line number Diff line change
@@ -63,8 +63,15 @@ class MemoryPoolTest : public testing::TestWithParam<bool> {
MmapAllocator::setDefaultInstance(nullptr);
}

std::shared_ptr<IMemoryManager> getMemoryManager(int64_t quota) {
return std::make_shared<MemoryManager>(quota);
std::shared_ptr<IMemoryManager> getMemoryManager(uint64_t capacity) {
IMemoryManager::Options options;
options.capacity = capacity;
return getMemoryManager(options);
}

std::shared_ptr<IMemoryManager> getMemoryManager(
const IMemoryManager::Options& options) {
return std::make_shared<MemoryManager>(options);
}

const bool useMmap_;
@@ -73,7 +80,9 @@ class MemoryPoolTest : public testing::TestWithParam<bool> {
};

TEST(MemoryPoolTest, Ctor) {
MemoryManager manager{8 * GB};
IMemoryManager::Options options;
options.capacity = 8 * GB;
MemoryManager manager{options};
// While not recommended, the root allocator should be valid.
auto& root = dynamic_cast<MemoryPoolImpl&>(manager.getRoot());

@@ -82,8 +91,8 @@ TEST(MemoryPoolTest, Ctor) {
ASSERT_EQ(root.parent(), nullptr);

{
auto fakeRoot =
std::make_shared<MemoryPoolImpl>(manager, "fake_root", nullptr, 4 * GB);
auto fakeRoot = std::make_shared<MemoryPoolImpl>(
manager, "fake_root", nullptr, MemoryPool::Options{.capacity = 4 * GB});
ASSERT_EQ("fake_root", fakeRoot->name());
ASSERT_EQ(4 * GB, fakeRoot->cap_);
ASSERT_EQ(&root.allocator_, &fakeRoot->allocator_);
@@ -270,7 +279,9 @@ TEST(MemoryPoolTest, UncapMemory) {

// Mainly tests how it tracks externally allocated memory.
TEST(MemoryPoolTest, ReserveTest) {
MemoryManager manager{8 * GB};
IMemoryManager::Options options;
options.capacity = 8 * GB;
MemoryManager manager{options};
auto& root = manager.getRoot();

auto child = root.addChild("elastic_quota");
@@ -309,7 +320,9 @@ void testMmapMemoryAllocation(
const MmapAllocator* mmapAllocator,
MachinePageCount allocPages,
size_t allocCount) {
MemoryManager manager(8 * GB);
IMemoryManager::Options options;
options.capacity = 8 * GB;
MemoryManager manager(options);
const auto kPageSize = 4 * KB;

auto& root = manager.getRoot();
@@ -329,7 +342,7 @@ void testMmapMemoryAllocation(
ASSERT_TRUE(allocResult != nullptr);

// Write data to let mapped address to be backed by physical memory
memcpy(allocResult, buffer.data(), byteSize);
::memcpy(allocResult, buffer.data(), byteSize);
allocations.emplace_back(allocResult);
totalPageAllocated += pageIncrement;
totalPageMapped += pageIncrement;
@@ -381,7 +394,7 @@ TEST_P(MemoryPoolTest, AllocTest) {
const int64_t kChunkSize{32L * MB};

void* oneChunk = child->allocate(kChunkSize);
ASSERT_EQ(reinterpret_cast<uint64_t>(oneChunk) % child->getAlignment(), 0);
ASSERT_EQ(reinterpret_cast<uint64_t>(oneChunk) % child->alignment(), 0);
ASSERT_EQ(kChunkSize, child->getCurrentBytes());
ASSERT_EQ(kChunkSize, child->getMaxBytes());

@@ -442,7 +455,7 @@ TEST_P(MemoryPoolTest, ReallocTestHigher) {
EXPECT_EQ(3 * kChunkSize, pool->getMaxBytes());
}

TEST_P(MemoryPoolTest, ReallocTestLower) {
TEST_P(MemoryPoolTest, reallocTestLower) {
auto manager = getMemoryManager(8 * GB);
auto& root = manager->getRoot();
auto pool = root.addChild("elastic_quota");
@@ -462,7 +475,7 @@ TEST_P(MemoryPoolTest, ReallocTestLower) {
EXPECT_EQ(3 * kChunkSize, pool->getMaxBytes());
}

TEST_P(MemoryPoolTest, CapAllocation) {
TEST_P(MemoryPoolTest, capAllocation) {
auto manager = getMemoryManager(8 * GB);
auto& root = manager->getRoot();

@@ -509,7 +522,7 @@ TEST_P(MemoryPoolTest, allocateZeroFilled) {
SCOPED_TRACE(
fmt::format("numEntries{}, sizeEach{}", numEntries, sizeEach));
void* ptr = pool->allocateZeroFilled(numEntries, sizeEach);
ASSERT_EQ(reinterpret_cast<uint64_t>(ptr) % pool->getAlignment(), 0);
ASSERT_EQ(reinterpret_cast<uint64_t>(ptr) % pool->alignment(), 0);
allocationPtrs.push_back(ptr);
allocationSizes.push_back(numEntries * sizeEach);
}
@@ -521,7 +534,9 @@ TEST_P(MemoryPoolTest, allocateZeroFilled) {
}

TEST_P(MemoryPoolTest, MemoryCapExceptions) {
MemoryManager manager{127L * MB};
IMemoryManager::Options options;
options.capacity = 127L * MB;
MemoryManager manager{options};
auto& root = manager.getRoot();

auto pool = root.addChild("static_quota", 63L * MB);
@@ -572,13 +587,10 @@ TEST_P(MemoryPoolTest, MemoryCapExceptions) {
}
}

TEST_P(MemoryPoolTest, GetAlignment) {
MemoryManager manager{32 * MB};
EXPECT_EQ(MemoryAllocator::kMaxAlignment, manager.getRoot().getAlignment());
}

TEST_P(MemoryPoolTest, MemoryManagerGlobalCap) {
MemoryManager manager{32 * MB};
IMemoryManager::Options options;
options.capacity = 32 * MB;
MemoryManager manager{options};

auto& root = manager.getRoot();
auto pool = root.addChild("unbounded");
@@ -601,7 +613,9 @@ TEST_P(MemoryPoolTest, MemoryManagerGlobalCap) {
// and what it returns for getCurrentBytes()/getMaxBytes and
// with memoryUsageTracker.
TEST_P(MemoryPoolTest, childUsageTest) {
MemoryManager manager{8 * GB};
IMemoryManager::Options options;
options.capacity = 8 * GB;
MemoryManager manager{options};
auto& root = manager.getRoot();

auto pool = root.addChild("main_pool");
@@ -1099,6 +1113,49 @@ DEBUG_ONLY_TEST_P(MemoryPoolTest, nonContiguousAllocateError) {
ASSERT_TRUE(allocation->empty());
}

TEST_P(MemoryPoolTest, alignmentCheck) {
std::vector<uint16_t> alignments = {
0,
MemoryAllocator::kMinAlignment,
MemoryAllocator::kMinAlignment * 2,
MemoryAllocator::kMaxAlignment};
for (const auto& alignment : alignments) {
SCOPED_TRACE(fmt::format("alignment:{}", alignment));
IMemoryManager::Options options;
options.capacity = 8 * GB;
options.alignment = alignment;
auto manager = getMemoryManager(options);
auto& root = manager->getRoot();
ASSERT_EQ(
root.alignment(),
alignment == 0 ? MemoryAllocator::kMinAlignment : alignment);
const int32_t kTestIterations = 10;
for (int32_t i = 0; i < 10; ++i) {
const int64_t bytesToAlloc = 1 + folly::Random::rand32() % (1 << 20);
void* ptr = root.allocate(bytesToAlloc);
if (alignment != 0) {
ASSERT_EQ(reinterpret_cast<uint64_t>(ptr) % alignment, 0);
}
root.free(ptr, bytesToAlloc);
}
ASSERT_EQ(0, root.getCurrentBytes());

auto child = manager->getChild();
ASSERT_EQ(
child->alignment(),
alignment == 0 ? MemoryAllocator::kMinAlignment : alignment);
for (int32_t i = 0; i < 10; ++i) {
const int64_t bytesToAlloc = 1 + folly::Random::rand32() % (1 << 20);
void* ptr = child->allocate(bytesToAlloc);
if (alignment != 0) {
ASSERT_EQ(reinterpret_cast<uint64_t>(ptr) % alignment, 0);
}
child->free(ptr, bytesToAlloc);
}
ASSERT_EQ(0, child->getCurrentBytes());
}
}

VELOX_INSTANTIATE_TEST_SUITE_P(
MemoryPoolTestSuite,
MemoryPoolTest,
31 changes: 0 additions & 31 deletions velox/core/PlanNode.h
Original file line number Diff line number Diff line change
@@ -587,24 +587,6 @@ class GroupIdNode : public PlanNode {
std::string groupIdName,
PlanNodePtr source);

#ifdef VELOX_ENABLE_BACKWARD_COMPATIBILITY
/// TODO Remove after updating Prestissimo.
GroupIdNode(
PlanNodeId id,
std::vector<std::vector<FieldAccessTypedExprPtr>> groupingSets,
std::map<std::string, FieldAccessTypedExprPtr> outputGroupingKeyNames,
std::vector<FieldAccessTypedExprPtr> aggregationInputs,
std::string groupIdName,
PlanNodePtr source)
: GroupIdNode(
std::move(id),
std::move(groupingSets),
makeGroupingKeyInfos(outputGroupingKeyNames),
std::move(aggregationInputs),
std::move(groupIdName),
std::move(source)) {}
#endif

const RowTypePtr& outputType() const override {
return outputType_;
}
@@ -639,19 +621,6 @@ class GroupIdNode : public PlanNode {
}

private:
#ifdef VELOX_ENABLE_BACKWARD_COMPATIBILITY
/// TODO Remove after updating Prestissimo.
static std::vector<GroupIdNode::GroupingKeyInfo> makeGroupingKeyInfos(
const std::map<std::string, FieldAccessTypedExprPtr>&
outputGroupingKeyNames) {
std::vector<GroupIdNode::GroupingKeyInfo> infos;
for (const auto& [name, field] : outputGroupingKeyNames) {
infos.push_back({name, field});
}
return infos;
}
#endif

void addDetails(std::stringstream& stream) const override;

const std::vector<PlanNodePtr> sources_;
9 changes: 9 additions & 0 deletions velox/docs/functions/json.rst
Original file line number Diff line number Diff line change
@@ -124,6 +124,15 @@ JSON Functions

SELECT json_array_contains('[1, 2, 3]', 2);

.. function:: json_size(json, value) -> bigint

Returns the size of the ``value``. For ``objects`` or ``arrays``, the size
is the number of members, and the size of a ``scalar`` value is zero::

SELECT json_size('{"x": {"a": 1, "b": 2}}', '$.x'); -- 2
SELECT json_size('{"x": [1, 2, 3]}', '$.x'); -- 3
SELECT json_size('{"x": {"a": 1, "b": 2}}', '$.x.a'); -- 0

.. function:: json_format(json) -> varchar

Serializes the input JSON value to JSON text conforming to RFC 7159.
3 changes: 3 additions & 0 deletions velox/dwio/common/Range.h
Original file line number Diff line number Diff line change
@@ -32,6 +32,9 @@ possible.
class Ranges {
public:
void add(size_t begin, size_t end) {
if (begin == end) {
return;
}
DWIO_ENSURE_LT(begin, end);
size_ += (end - begin);
if (ranges_.size()) {
2 changes: 1 addition & 1 deletion velox/dwio/common/tests/RangeTests.cpp
Original file line number Diff line number Diff line change
@@ -26,7 +26,7 @@ namespace facebook::velox::common {
TEST(RangeTests, Add) {
Ranges ranges;
ASSERT_THROW(ranges.add(2, 1), exception::LoggedException);
ASSERT_THROW(ranges.add(2, 2), exception::LoggedException);
ranges.add(2, 2);
ranges.add(1, 3);
ASSERT_THAT(ranges.ranges_, ElementsAre(std::tuple<size_t, size_t>{1, 3}));
ASSERT_EQ(ranges.size(), 2);
8 changes: 4 additions & 4 deletions velox/dwio/dwrf/test/WriterFlushTest.cpp
Original file line number Diff line number Diff line change
@@ -38,7 +38,7 @@ class MockMemoryPool : public velox::memory::MemoryPool {
const std::string& name,
std::shared_ptr<MemoryPool> parent,
int64_t cap = std::numeric_limits<int64_t>::max())
: MemoryPool{name, parent}, cap_{cap} {}
: MemoryPool{name, parent, 0}, cap_{cap} {}

// Methods not usually exposed by MemoryPool interface to
// allow for manipulation.
@@ -108,8 +108,8 @@ class MockMemoryPool : public velox::memory::MemoryPool {

bool allocateContiguous(
velox::memory::MachinePageCount /*unused*/,
velox::memory::MemoryAllocator::ContiguousAllocation& /*unused*/)
override {
velox::memory::MemoryAllocator::ContiguousAllocation&
/*unused*/) override {
VELOX_UNSUPPORTED("allocateContiguous unsupported");
}

@@ -158,7 +158,7 @@ class MockMemoryPool : public velox::memory::MemoryPool {
setMemoryUsageTracker,
void(const std::shared_ptr<velox::memory::MemoryUsageTracker>&));

MOCK_CONST_METHOD0(getAlignment, uint16_t());
MOCK_CONST_METHOD0(alignment, uint16_t());

private:
velox::memory::MemoryAllocator* const FOLLY_NONNULL allocator_{
17 changes: 15 additions & 2 deletions velox/expression/Expr.cpp
Original file line number Diff line number Diff line change
@@ -313,7 +313,14 @@ void Expr::evalSimplifiedImpl(
}

// Apply the actual function.
vectorFunction_->apply(remainingRows, inputValues_, type(), context, result);
try {
vectorFunction_->apply(
remainingRows, inputValues_, type(), context, result);
} catch (const VeloxException& ve) {
throw;
} catch (const std::exception& e) {
VELOX_USER_FAIL(e.what());
}

// Make sure the returned vector has its null bitmap properly set.
addNulls(rows, remainingRows.asRange().bits(), context, result);
@@ -1438,7 +1445,13 @@ void Expr::applyFunction(
? computeIsAsciiForResult(vectorFunction_.get(), inputValues_, rows)
: std::nullopt;

vectorFunction_->apply(rows, inputValues_, type(), context, result);
try {
vectorFunction_->apply(rows, inputValues_, type(), context, result);
} catch (const VeloxException& ve) {
throw;
} catch (const std::exception& e) {
VELOX_USER_FAIL(e.what());
}

if (!result) {
LocalSelectivityVector mutableRemainingRowsHolder(context);
74 changes: 64 additions & 10 deletions velox/expression/tests/ExprTest.cpp
Original file line number Diff line number Diff line change
@@ -53,13 +53,13 @@ class ExprTest : public testing::Test, public VectorTestBase {
return core::Expressions::inferTypes(untyped, rowType, execCtx_->pool());
}

std::unique_ptr<exec::ExprSet> compileExpression(
template <typename T = exec::ExprSet>
std::unique_ptr<T> compileExpression(
const std::string& expr,
const RowTypePtr& rowType) {
std::vector<core::TypedExprPtr> expressions = {
parseExpression(expr, rowType)};
return std::make_unique<exec::ExprSet>(
std::move(expressions), execCtx_.get());
return std::make_unique<T>(std::move(expressions), execCtx_.get());
}

std::unique_ptr<exec::ExprSet> compileMultiple(
@@ -91,7 +91,13 @@ class ExprTest : public testing::Test, public VectorTestBase {
return evaluateMultiple({text}, input)[0];
}

VectorPtr evaluate(exec::ExprSet* exprSet, const RowVectorPtr& input) {
template <
typename T = exec::ExprSet,
typename = std::enable_if_t<
std::is_same_v<T, exec::ExprSet> ||
std::is_same_v<T, exec::ExprSetSimplified>,
bool>>
VectorPtr evaluate(T* exprSet, const RowVectorPtr& input) {
exec::EvalCtx context(execCtx_.get(), exprSet, input.get());

SelectivityVector rows(input->size());
@@ -235,6 +241,21 @@ class ExprTest : public testing::Test, public VectorTestBase {
}
}

void assertErrorSimplified(
const std::string& expression,
const VectorPtr& input,
const std::string& message) {
try {
auto inputVector = makeRowVector({input});
auto exprSetSimplified = compileExpression<exec::ExprSetSimplified>(
expression, asRowType(inputVector->type()));
evaluate<exec::ExprSetSimplified>(exprSetSimplified.get(), inputVector);
ASSERT_TRUE(false) << "Expected an error";
} catch (VeloxException& e) {
ASSERT_EQ(message, e.message());
}
}

std::exception_ptr assertWrappedException(
const std::string& expression,
const VectorPtr& input,
@@ -2985,19 +3006,30 @@ TEST_F(ExprTest, addNulls) {
}

namespace {

// Throw a VeloxException if veloxException_ is true. Throw an std exception
// otherwise.
class AlwaysThrowsVectorFunction : public exec::VectorFunction {
public:
static constexpr const char* kErrorMessage = "Expected";
static constexpr const char* kVeloxErrorMessage = "Velox Exception: Expected";
static constexpr const char* kStdErrorMessage = "Std Exception: Expected";

explicit AlwaysThrowsVectorFunction(bool veloxException)
: veloxException_{veloxException} {}

void apply(
const SelectivityVector& rows,
std::vector<VectorPtr>& /* args */,
const TypePtr& /* outputType */,
exec::EvalCtx& context,
VectorPtr& /* result */) const override {
auto error = std::make_exception_ptr(std::invalid_argument(kErrorMessage));
context.setErrors(rows, error);
return;
if (veloxException_) {
auto error =
std::make_exception_ptr(std::invalid_argument(kVeloxErrorMessage));
context.setErrors(rows, error);
return;
}
throw std::invalid_argument(kStdErrorMessage);
}

static std::vector<std::shared_ptr<exec::FunctionSignature>> signatures() {
@@ -3006,6 +3038,9 @@ class AlwaysThrowsVectorFunction : public exec::VectorFunction {
.argumentType("integer")
.build()};
}

private:
const bool veloxException_;
};

class NoOpVectorFunction : public exec::VectorFunction {
@@ -3034,7 +3069,7 @@ TEST_F(ExprTest, applyFunctionNoResult) {
exec::registerVectorFunction(
"always_throws_vector_function",
AlwaysThrowsVectorFunction::signatures(),
std::make_unique<AlwaysThrowsVectorFunction>());
std::make_unique<AlwaysThrowsVectorFunction>(true));

// At various places in the code, we don't check if result has been set or
// not. Conjuncts have the nice property that they set throwOnError to
@@ -3044,7 +3079,7 @@ TEST_F(ExprTest, applyFunctionNoResult) {
makeFlatVector<int32_t>({1, 2, 3}),
"always_throws_vector_function(c0)",
"and(always_throws_vector_function(c0), 1:BOOLEAN)",
AlwaysThrowsVectorFunction::kErrorMessage);
AlwaysThrowsVectorFunction::kVeloxErrorMessage);

exec::registerVectorFunction(
"no_op",
@@ -3099,3 +3134,22 @@ TEST_F(ExprTest, constantWrap) {
assertEqualVectors(
makeNullableFlatVector<bool>({std::nullopt, true, false, true}), result);
}

TEST_F(ExprTest, stdExceptionInVectorFunction) {
exec::registerVectorFunction(
"always_throws_vector_function",
AlwaysThrowsVectorFunction::signatures(),
std::make_unique<AlwaysThrowsVectorFunction>(false));

assertError(
"always_throws_vector_function(c0)",
makeFlatVector<int32_t>({1, 2, 3}),
"always_throws_vector_function(c0)",
"Same as context.",
AlwaysThrowsVectorFunction::kStdErrorMessage);

assertErrorSimplified(
"always_throws_vector_function(c0)",
makeFlatVector<int32_t>({1, 2, 3}),
AlwaysThrowsVectorFunction::kStdErrorMessage);
}
30 changes: 28 additions & 2 deletions velox/functions/prestosql/JsonFunctions.h
Original file line number Diff line number Diff line change
@@ -23,12 +23,12 @@ namespace facebook::velox::functions {
template <typename T>
struct JsonFormatFunction {
VELOX_DEFINE_FUNCTION_TYPES(T);
FOLLY_ALWAYS_INLINE bool call(

FOLLY_ALWAYS_INLINE void call(
out_type<Varchar>& jsonString,
const arg_type<Json>& json) {
folly::parseJson(json.getString());
jsonString.setNoCopy(json);
return true;
}
};

@@ -125,4 +125,30 @@ struct JsonArrayContainsFunction {
}
};

template <typename T>
struct JsonSizeFunction {
VELOX_DEFINE_FUNCTION_TYPES(T);

FOLLY_ALWAYS_INLINE bool call(
int64_t& result,
const arg_type<Json>& json,
const arg_type<Varchar>& jsonPath) {
const folly::StringPiece& jsonStringPiece = json;
const folly::StringPiece& jsonPathStringPiece = jsonPath;
auto extractResult = jsonExtract(jsonStringPiece, jsonPathStringPiece);
if (!extractResult.has_value()) {
return false;
}
// The size of the object or array is the number of members, otherwise the
// size is zero
if (extractResult->isArray() || extractResult->isObject()) {
result = extractResult->size();
} else {
result = 0;
}

return true;
}
};

} // namespace facebook::velox::functions
Original file line number Diff line number Diff line change
@@ -34,6 +34,7 @@ void registerJsonFunctions() {
{"json_array_contains"});
registerFunction<JsonArrayContainsFunction, bool, Json, Varchar>(
{"json_array_contains"});
registerFunction<JsonSizeFunction, int64_t, Json, Varchar>({"json_size"});
registerFunction<JsonFormatFunction, Varchar, Json>({"json_format"});
}

39 changes: 39 additions & 0 deletions velox/functions/prestosql/tests/JsonFunctionsTest.cpp
Original file line number Diff line number Diff line change
@@ -39,6 +39,12 @@ class JsonFunctionsTest : public functions::test::FunctionBaseTest {
return evaluateOnce<bool>("json_array_contains(c0, c1)", json, value);
}

std::optional<int64_t> json_size(
std::optional<std::string> json,
std::optional<std::string> path) {
return evaluateOnce<int64_t>("json_size(c0, c1)", json, path);
}

static std::unordered_set<std::string> getSignatureStrings(
const std::string& functionName) {
auto allSignatures = getFunctionSignatures();
@@ -114,6 +120,13 @@ TEST_F(JsonFunctionsTest, jsonArrayContainsSignatures) {
ASSERT_EQ(1, signatures.count("(json,boolean) -> boolean"));
}

TEST_F(JsonFunctionsTest, jsonSizeSignatures) {
auto signatures = getSignatureStrings("json_size");
ASSERT_EQ(1, signatures.size());

ASSERT_EQ(1, signatures.count("(json,varchar) -> bigint"));
}

TEST_F(JsonFunctionsTest, isJsonScalar) {
// Scalars.
EXPECT_EQ(is_json_scalar(R"(1)"), true);
@@ -325,6 +338,32 @@ TEST_F(JsonFunctionsTest, jsonArrayContainsString) {
true);
}

TEST_F(JsonFunctionsTest, jsonSize) {
EXPECT_EQ(json_size(R"({"k1":{"k2": 999}, "k3": 1})", "$.k1.k2"), 0);
EXPECT_EQ(json_size(R"({"k1":{"k2": 999}, "k3": 1})", "$.k1"), 1);
EXPECT_EQ(json_size(R"({"k1":{"k2": 999}, "k3": 1})", "$"), 2);
EXPECT_EQ(json_size(R"({"k1":{"k2": 999}, "k3": 1})", "$.k3"), 0);
EXPECT_EQ(json_size(R"({"k1":{"k2": 999}, "k3": [1, 2, 3, 4]})", "$.k3"), 4);
EXPECT_EQ(json_size(R"({"k1":{"k2": 999}, "k3": 1})", "$.k4"), std::nullopt);
EXPECT_EQ(json_size(R"({"k1":{"k2": 999}, "k3"})", "$.k4"), std::nullopt);
EXPECT_EQ(json_size(R"({"k1":{"k2": 999}, "k3": true})", "$.k3"), 0);
EXPECT_EQ(json_size(R"({"k1":{"k2": 999}, "k3": null})", "$.k3"), 0);
EXPECT_EQ(
json_size(
R"({"k1":{"k2": 999, "k3": [{"k4": [1, 2, 3]}]}})", "$.k1.k3[0].k4"),
3);
}

TEST_F(JsonFunctionsTest, invalidPath) {
EXPECT_THROW(json_size(R"([0,1,2])", ""), VeloxUserError);
EXPECT_THROW(json_size(R"([0,1,2])", "$[]"), VeloxUserError);
EXPECT_THROW(json_size(R"([0,1,2])", "$[-1]"), VeloxUserError);
EXPECT_THROW(json_size(R"({"k1":"v1"})", "$k1"), VeloxUserError);
EXPECT_THROW(json_size(R"({"k1":"v1"})", "$.k1."), VeloxUserError);
EXPECT_THROW(json_size(R"({"k1":"v1"})", "$.k1]"), VeloxUserError);
EXPECT_THROW(json_size(R"({"k1":"v1)", "$.k1]"), VeloxUserError);
}

} // namespace

} // namespace facebook::velox::functions::prestosql