Skip to content

Commit

Permalink
fix: Fix importing long decimal vector from Arrow (#11404)
Browse files Browse the repository at this point in the history
Summary:
When integrating Spark query runner with Spark expression fuzzer test, we found
it cored dumps at below point when copying a decimal vector, whose memory is
allocated by 'arrow::ipc::RecordBatchReader'.

https://github.com/facebookincubator/velox/blob/7b2bb7f672b435d38d5a83f9cd8441bf17b564e6/velox/vector/FlatVector-inl.h#L198

The reason is Arrow uses two uint64_t values to represent a 128-bit decimal
value, and the allocated memory might not be 16-byte aligned.This PR adds a
copy process for long decimal in 'importFromArrowImpl' to ensure the alignment.

#2388

Pull Request resolved: #11404

Reviewed By: pedroerp

Differential Revision: D66307435

Pulled By: kagamiori

fbshipit-source-id: 86081c041169cfc196e68f36629a246f8626d3d9
  • Loading branch information
rui-mo authored and facebook-github-bot committed Nov 21, 2024
1 parent ebfb1e5 commit df3266c
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 0 deletions.
17 changes: 17 additions & 0 deletions velox/functions/sparksql/fuzzer/tests/SparkQueryRunnerTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include <gtest/gtest.h>

#include "velox/common/base/tests/GTestUtils.h"
#include "velox/dwio/parquet/RegisterParquetWriter.h"
#include "velox/exec/tests/utils/AssertQueryBuilder.h"
#include "velox/exec/tests/utils/PlanBuilder.h"
#include "velox/functions/sparksql/Register.h"
Expand All @@ -37,6 +38,7 @@ class SparkQueryRunnerTest : public ::testing::Test,
protected:
static void SetUpTestCase() {
memory::MemoryManager::testingSetInstance({});
parquet::registerParquetWriterFactory();
}

void SetUp() override {
Expand Down Expand Up @@ -80,6 +82,21 @@ TEST_F(SparkQueryRunnerTest, DISABLED_basic) {
exec::test::assertEqualResults(sparkResults, outputType, {expected});
}

// This test requires a Spark coordinator running at localhost, so disable it
// by default.
TEST_F(SparkQueryRunnerTest, DISABLED_decimal) {
auto aggregatePool = rootPool_->addAggregateChild("decimal");
auto queryRunner = std::make_unique<fuzzer::SparkQueryRunner>(
aggregatePool.get(), "localhost:15002", "test", "decimal");
auto input = makeRowVector({
makeConstant<int128_t>(123456789, 25, DECIMAL(34, 2)),
});
auto outputType = ROW({"a"}, {DECIMAL(34, 2)});
auto sparkResults =
queryRunner->execute("SELECT abs(c0) FROM tmp", {input}, outputType);
exec::test::assertEqualResults(sparkResults, outputType, {input});
}

// This test requires a Spark coordinator running at localhost, so disable it
// by default.
TEST_F(SparkQueryRunnerTest, DISABLED_fuzzer) {
Expand Down
26 changes: 26 additions & 0 deletions velox/vector/arrow/Bridge.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1866,6 +1866,24 @@ VectorPtr createShortDecimalVector(
pool, type, nulls, length, values, nullCount);
}

// Arrow uses two uint64_t values to represent a 128-bit decimal value. The
// memory allocated by Arrow might not be 16-byte aligned, so we need to copy
// the values to a new buffer to ensure 16-byte alignment.
VectorPtr createLongDecimalVector(
memory::MemoryPool* pool,
const TypePtr& type,
BufferPtr nulls,
const int128_t* input,
size_t length,
int64_t nullCount) {
auto values = AlignedBuffer::allocate<int128_t>(length, pool);
auto rawValues = values->asMutable<int128_t>();
memcpy(rawValues, input, length * sizeof(int128_t));

return createFlatVector<TypeKind::HUGEINT>(
pool, type, nulls, length, values, nullCount);
}

bool isREE(const ArrowSchema& arrowSchema) {
return arrowSchema.format[0] == '+' && arrowSchema.format[1] == 'r';
}
Expand Down Expand Up @@ -1960,6 +1978,14 @@ VectorPtr importFromArrowImpl(
static_cast<const int128_t*>(arrowArray.buffers[1]),
arrowArray.length,
arrowArray.null_count);
} else if (type->isLongDecimal()) {
return createLongDecimalVector(
pool,
type,
nulls,
static_cast<const int128_t*>(arrowArray.buffers[1]),
arrowArray.length,
arrowArray.null_count);
} else if (type->isRow()) {
// Row/structs.
return createRowVector(
Expand Down
8 changes: 8 additions & 0 deletions velox/vector/arrow/tests/ArrowBridgeArrayTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1270,6 +1270,14 @@ class ArrowBridgeArrayImportTest : public ArrowBridgeArrayExportTest {

testArrowImport<int64_t, int128_t>(
"d:5,2", {1, -1, 0, 12345, -12345, std::nullopt});
testArrowImport<int128_t, int128_t>(
"d:36,2",
{HugeInt::parse("20000000000000000"),
HugeInt::parse("50000000000000000"),
0,
HugeInt::parse("50000000000000000000"),
HugeInt::parse("-40000000000000000000"),
std::nullopt});
}

template <typename TOutput, typename TInput>
Expand Down

0 comments on commit df3266c

Please sign in to comment.