Skip to content

Commit

Permalink
Implement double/int -> pg numeric conversion YQL-16767 (#727)
Browse files Browse the repository at this point in the history
  • Loading branch information
resetius authored Dec 28, 2023
1 parent 3422227 commit 9150df9
Show file tree
Hide file tree
Showing 5 changed files with 215 additions and 0 deletions.
53 changes: 53 additions & 0 deletions ydb/library/yql/parser/pg_wrapper/arrow.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#include "arrow.h"
#include "arrow_impl.h"
#include <ydb/library/yql/parser/pg_wrapper/interface/arrow.h>
#include <ydb/library/yql/parser/pg_wrapper/interface/utils.h>
#include <ydb/library/yql/minikql/mkql_node_cast.h>
Expand Down Expand Up @@ -151,6 +152,55 @@ std::shared_ptr<arrow::Array> PgConvertString(const std::shared_ptr<arrow::Array
return ret;
}

Numeric PgFloatToNumeric(double item, ui64 scale, int digits) {
double intPart, fracPart;
bool error;
fracPart = modf(item, &intPart);
i64 fracInt = round(fracPart * scale);

// scale compaction: represent 711.56000 as 711.56
while (digits > 0 && fracInt % 10 == 0) {
fracInt /= 10;
digits -= 1;
}

if (digits == 0) {
return int64_to_numeric(intPart);
} else {
return numeric_add_opt_error(
int64_to_numeric(intPart),
int64_div_fast_to_numeric(fracInt, digits),
&error);
}
}

TColumnConverter BuildPgNumericColumnConverter(const std::shared_ptr<arrow::DataType>& originalType) {
switch (originalType->id()) {
case arrow::Type::INT16:
return [](const std::shared_ptr<arrow::Array>& value) {
return PgConvertNumeric<i16>(value);
};
case arrow::Type::INT32:
return [](const std::shared_ptr<arrow::Array>& value) {
return PgConvertNumeric<i32>(value);
};
case arrow::Type::INT64:
return [](const std::shared_ptr<arrow::Array>& value) {
return PgConvertNumeric<i64>(value);
};
case arrow::Type::FLOAT:
return [](const std::shared_ptr<arrow::Array>& value) {
return PgConvertNumeric<float>(value);
};
case arrow::Type::DOUBLE:
return [](const std::shared_ptr<arrow::Array>& value) {
return PgConvertNumeric<double>(value);
};
default:
return {};
}
}

template <typename T, typename F>
TColumnConverter BuildPgFixedColumnConverter(const std::shared_ptr<arrow::DataType>& originalType, const F& f) {
auto primaryType = NKikimr::NMiniKQL::GetPrimitiveDataType<T>();
Expand Down Expand Up @@ -200,6 +250,9 @@ TColumnConverter BuildPgColumnConverter(const std::shared_ptr<arrow::DataType>&
case FLOAT8OID: {
return BuildPgFixedColumnConverter<double>(originalType, [](auto value){ return Float8GetDatum(value); });
}
case NUMERICOID: {
return BuildPgNumericColumnConverter(originalType);
}
case BYTEAOID:
case VARCHAROID:
case TEXTOID:
Expand Down
1 change: 1 addition & 0 deletions ydb/library/yql/parser/pg_wrapper/arrow.h
Original file line number Diff line number Diff line change
Expand Up @@ -1299,3 +1299,4 @@ TExecFunc FindExec(Oid oid);
const NPg::TAggregateDesc& ResolveAggregation(const TString& name, NKikimr::NMiniKQL::TTupleType* tupleType, const std::vector<ui32>& argsColumns, NKikimr::NMiniKQL::TType* returnType);

}

48 changes: 48 additions & 0 deletions ydb/library/yql/parser/pg_wrapper/arrow_impl.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
#pragma once

#include <arrow/array.h>
#include <arrow/array/builder_binary.h>

extern "C" {
#include "utils/numeric.h"
}

namespace NYql {

Numeric PgFloatToNumeric(double item, ui64 scale, int digits);

template<typename T>
std::shared_ptr<arrow::Array> PgConvertNumeric(const std::shared_ptr<arrow::Array>& value) {
TArenaMemoryContext arena;
const auto& data = value->data();
size_t length = data->length;
arrow::BinaryBuilder builder;
auto input = data->GetValues<T>(1);
for (size_t i = 0; i < length; ++i) {
if (value->IsNull(i)) {
builder.AppendNull();
continue;
}
T item = input[i];
Numeric v;
if constexpr(std::is_same_v<T,double>) {
v = PgFloatToNumeric(item, 1000000000000LL, 12);
} else if constexpr(std::is_same_v<T,float>) {
v = PgFloatToNumeric(item, 1000000LL, 6);
} else {
v = int64_to_numeric(item);
}
auto datum = NumericGetDatum(v);
auto ptr = (char*)datum;
auto len = GetFullVarSize((const text*)datum);
NUdf::ZeroMemoryContext(ptr);
ARROW_OK(builder.Append(ptr - sizeof(void*), len + sizeof(void*)));
}

std::shared_ptr<arrow::BinaryArray> ret;
ARROW_OK(builder.Finish(&ret));
return ret;
}

}

112 changes: 112 additions & 0 deletions ydb/library/yql/parser/pg_wrapper/ut/arrow_ut.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
#include <arrow/api.h>
#include <arrow/array.h>

#include <library/cpp/testing/unittest/registar.h>

#include "arrow.h"
#include "arrow_impl.h"

extern "C" {
#include "utils/fmgrprotos.h"
}

namespace NYql {

Y_UNIT_TEST_SUITE(TArrowUtilsTests) {

Y_UNIT_TEST(TestPgFloatToNumeric) {
TArenaMemoryContext arena;
auto n = PgFloatToNumeric(711.56, 1000000000000LL, 12);
auto value = TString(DatumGetCString(DirectFunctionCall1(numeric_out, NumericGetDatum(n))));
UNIT_ASSERT_VALUES_EQUAL(value, "711.56");

n = PgFloatToNumeric(-711.56, 1000000000000LL, 12);
value = TString(DatumGetCString(DirectFunctionCall1(numeric_out, NumericGetDatum(n))));
UNIT_ASSERT_VALUES_EQUAL(value, "-711.56");

n = PgFloatToNumeric(711.56f, 100000LL, 5);
value = TString(DatumGetCString(DirectFunctionCall1(numeric_out, NumericGetDatum(n))));
UNIT_ASSERT_VALUES_EQUAL(value, "711.56");

n = PgFloatToNumeric(-711.56f, 100000LL, 5);
value = TString(DatumGetCString(DirectFunctionCall1(numeric_out, NumericGetDatum(n))));
UNIT_ASSERT_VALUES_EQUAL(value, "-711.56");
}


Y_UNIT_TEST(PgConvertNumericDouble) {
TArenaMemoryContext arena;

arrow::DoubleBuilder builder;
builder.Append(1.1);
builder.Append(31.37);
builder.AppendNull();
builder.Append(-1.337);
builder.Append(0.0);

std::shared_ptr<arrow::Array> array;
builder.Finish(&array);

auto result = PgConvertNumeric<double>(array);
const auto& data = result->data();

const char* expected[] = {
"1.1", "31.37", nullptr, "-1.337", "0"
};

NUdf::TStringBlockReader<arrow::BinaryType, true> reader;
for (int i = 0; i < 5; i++) {
auto item = reader.GetItem(*data, i);
if (!item) {
UNIT_ASSERT(expected[i] == nullptr);
} else {
const char* addr = item.AsStringRef().Data() + sizeof(void*);
UNIT_ASSERT(expected[i] != nullptr);
UNIT_ASSERT_VALUES_EQUAL(
TString(DatumGetCString(DirectFunctionCall1(numeric_out, (Datum)addr))),
expected[i]
);
}
}
}

Y_UNIT_TEST(PgConvertNumericInt) {
TArenaMemoryContext arena;

arrow::Int64Builder builder;
builder.Append(11);
builder.Append(3137);
builder.AppendNull();
builder.Append(-1337);
builder.Append(0);

std::shared_ptr<arrow::Array> array;
builder.Finish(&array);

auto result = PgConvertNumeric<i64>(array);
const auto& data = result->data();

const char* expected[] = {
"11", "3137", nullptr, "-1337", "0"
};

NUdf::TStringBlockReader<arrow::BinaryType, true> reader;
for (int i = 0; i < 5; i++) {
auto item = reader.GetItem(*data, i);
if (!item) {
UNIT_ASSERT(expected[i] == nullptr);
} else {
const char* addr = item.AsStringRef().Data() + sizeof(void*);
UNIT_ASSERT(expected[i] != nullptr);
UNIT_ASSERT_VALUES_EQUAL(
TString(DatumGetCString(DirectFunctionCall1(numeric_out, (Datum)addr))),
expected[i]
);
}
}
}

} // Y_UNIT_TEST_SUITE(TArrowUtilsTests)

} // namespace NYql

1 change: 1 addition & 0 deletions ydb/library/yql/parser/pg_wrapper/ut/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ NO_COMPILER_WARNINGS()
INCLUDE(../cflags.inc)

SRCS(
arrow_ut.cpp
codegen_ut.cpp
error_ut.cpp
parser_ut.cpp
Expand Down

0 comments on commit 9150df9

Please sign in to comment.