From 9150df9933a5db77e51432e46b3ea63c7a8c124d Mon Sep 17 00:00:00 2001 From: Alexey Ozeritskiy Date: Thu, 28 Dec 2023 13:27:53 +0100 Subject: [PATCH] Implement double/int -> pg numeric conversion YQL-16767 (#727) --- ydb/library/yql/parser/pg_wrapper/arrow.cpp | 53 +++++++++ ydb/library/yql/parser/pg_wrapper/arrow.h | 1 + .../yql/parser/pg_wrapper/arrow_impl.h | 48 ++++++++ .../yql/parser/pg_wrapper/ut/arrow_ut.cpp | 112 ++++++++++++++++++ ydb/library/yql/parser/pg_wrapper/ut/ya.make | 1 + 5 files changed, 215 insertions(+) create mode 100644 ydb/library/yql/parser/pg_wrapper/arrow_impl.h create mode 100644 ydb/library/yql/parser/pg_wrapper/ut/arrow_ut.cpp diff --git a/ydb/library/yql/parser/pg_wrapper/arrow.cpp b/ydb/library/yql/parser/pg_wrapper/arrow.cpp index 3c33570d8142..5a8559ff8f2d 100644 --- a/ydb/library/yql/parser/pg_wrapper/arrow.cpp +++ b/ydb/library/yql/parser/pg_wrapper/arrow.cpp @@ -1,4 +1,5 @@ #include "arrow.h" +#include "arrow_impl.h" #include #include #include @@ -151,6 +152,55 @@ std::shared_ptr PgConvertString(const std::shared_ptr 0 && fracInt % 10 == 0) { + fracInt /= 10; + digits -= 1; + } + + if (digits == 0) { + return int64_to_numeric(intPart); + } else { + return numeric_add_opt_error( + int64_to_numeric(intPart), + int64_div_fast_to_numeric(fracInt, digits), + &error); + } +} + +TColumnConverter BuildPgNumericColumnConverter(const std::shared_ptr& originalType) { + switch (originalType->id()) { + case arrow::Type::INT16: + return [](const std::shared_ptr& value) { + return PgConvertNumeric(value); + }; + case arrow::Type::INT32: + return [](const std::shared_ptr& value) { + return PgConvertNumeric(value); + }; + case arrow::Type::INT64: + return [](const std::shared_ptr& value) { + return PgConvertNumeric(value); + }; + case arrow::Type::FLOAT: + return [](const std::shared_ptr& value) { + return PgConvertNumeric(value); + }; + case arrow::Type::DOUBLE: + return [](const std::shared_ptr& value) { + return PgConvertNumeric(value); + }; + default: + return {}; + } +} + template TColumnConverter BuildPgFixedColumnConverter(const std::shared_ptr& originalType, const F& f) { auto primaryType = NKikimr::NMiniKQL::GetPrimitiveDataType(); @@ -200,6 +250,9 @@ TColumnConverter BuildPgColumnConverter(const std::shared_ptr& case FLOAT8OID: { return BuildPgFixedColumnConverter(originalType, [](auto value){ return Float8GetDatum(value); }); } + case NUMERICOID: { + return BuildPgNumericColumnConverter(originalType); + } case BYTEAOID: case VARCHAROID: case TEXTOID: diff --git a/ydb/library/yql/parser/pg_wrapper/arrow.h b/ydb/library/yql/parser/pg_wrapper/arrow.h index 1ef0ead7c554..7bd0cfcdd98d 100644 --- a/ydb/library/yql/parser/pg_wrapper/arrow.h +++ b/ydb/library/yql/parser/pg_wrapper/arrow.h @@ -1299,3 +1299,4 @@ TExecFunc FindExec(Oid oid); const NPg::TAggregateDesc& ResolveAggregation(const TString& name, NKikimr::NMiniKQL::TTupleType* tupleType, const std::vector& argsColumns, NKikimr::NMiniKQL::TType* returnType); } + diff --git a/ydb/library/yql/parser/pg_wrapper/arrow_impl.h b/ydb/library/yql/parser/pg_wrapper/arrow_impl.h new file mode 100644 index 000000000000..7574aada429f --- /dev/null +++ b/ydb/library/yql/parser/pg_wrapper/arrow_impl.h @@ -0,0 +1,48 @@ +#pragma once + +#include +#include + +extern "C" { +#include "utils/numeric.h" +} + +namespace NYql { + +Numeric PgFloatToNumeric(double item, ui64 scale, int digits); + +template +std::shared_ptr PgConvertNumeric(const std::shared_ptr& value) { + TArenaMemoryContext arena; + const auto& data = value->data(); + size_t length = data->length; + arrow::BinaryBuilder builder; + auto input = data->GetValues(1); + for (size_t i = 0; i < length; ++i) { + if (value->IsNull(i)) { + builder.AppendNull(); + continue; + } + T item = input[i]; + Numeric v; + if constexpr(std::is_same_v) { + v = PgFloatToNumeric(item, 1000000000000LL, 12); + } else if constexpr(std::is_same_v) { + v = PgFloatToNumeric(item, 1000000LL, 6); + } else { + v = int64_to_numeric(item); + } + auto datum = NumericGetDatum(v); + auto ptr = (char*)datum; + auto len = GetFullVarSize((const text*)datum); + NUdf::ZeroMemoryContext(ptr); + ARROW_OK(builder.Append(ptr - sizeof(void*), len + sizeof(void*))); + } + + std::shared_ptr ret; + ARROW_OK(builder.Finish(&ret)); + return ret; +} + +} + diff --git a/ydb/library/yql/parser/pg_wrapper/ut/arrow_ut.cpp b/ydb/library/yql/parser/pg_wrapper/ut/arrow_ut.cpp new file mode 100644 index 000000000000..c09359f40523 --- /dev/null +++ b/ydb/library/yql/parser/pg_wrapper/ut/arrow_ut.cpp @@ -0,0 +1,112 @@ +#include +#include + +#include + +#include "arrow.h" +#include "arrow_impl.h" + +extern "C" { +#include "utils/fmgrprotos.h" +} + +namespace NYql { + +Y_UNIT_TEST_SUITE(TArrowUtilsTests) { + +Y_UNIT_TEST(TestPgFloatToNumeric) { + TArenaMemoryContext arena; + auto n = PgFloatToNumeric(711.56, 1000000000000LL, 12); + auto value = TString(DatumGetCString(DirectFunctionCall1(numeric_out, NumericGetDatum(n)))); + UNIT_ASSERT_VALUES_EQUAL(value, "711.56"); + + n = PgFloatToNumeric(-711.56, 1000000000000LL, 12); + value = TString(DatumGetCString(DirectFunctionCall1(numeric_out, NumericGetDatum(n)))); + UNIT_ASSERT_VALUES_EQUAL(value, "-711.56"); + + n = PgFloatToNumeric(711.56f, 100000LL, 5); + value = TString(DatumGetCString(DirectFunctionCall1(numeric_out, NumericGetDatum(n)))); + UNIT_ASSERT_VALUES_EQUAL(value, "711.56"); + + n = PgFloatToNumeric(-711.56f, 100000LL, 5); + value = TString(DatumGetCString(DirectFunctionCall1(numeric_out, NumericGetDatum(n)))); + UNIT_ASSERT_VALUES_EQUAL(value, "-711.56"); +} + + +Y_UNIT_TEST(PgConvertNumericDouble) { + TArenaMemoryContext arena; + + arrow::DoubleBuilder builder; + builder.Append(1.1); + builder.Append(31.37); + builder.AppendNull(); + builder.Append(-1.337); + builder.Append(0.0); + + std::shared_ptr array; + builder.Finish(&array); + + auto result = PgConvertNumeric(array); + const auto& data = result->data(); + + const char* expected[] = { + "1.1", "31.37", nullptr, "-1.337", "0" + }; + + NUdf::TStringBlockReader reader; + for (int i = 0; i < 5; i++) { + auto item = reader.GetItem(*data, i); + if (!item) { + UNIT_ASSERT(expected[i] == nullptr); + } else { + const char* addr = item.AsStringRef().Data() + sizeof(void*); + UNIT_ASSERT(expected[i] != nullptr); + UNIT_ASSERT_VALUES_EQUAL( + TString(DatumGetCString(DirectFunctionCall1(numeric_out, (Datum)addr))), + expected[i] + ); + } + } +} + +Y_UNIT_TEST(PgConvertNumericInt) { + TArenaMemoryContext arena; + + arrow::Int64Builder builder; + builder.Append(11); + builder.Append(3137); + builder.AppendNull(); + builder.Append(-1337); + builder.Append(0); + + std::shared_ptr array; + builder.Finish(&array); + + auto result = PgConvertNumeric(array); + const auto& data = result->data(); + + const char* expected[] = { + "11", "3137", nullptr, "-1337", "0" + }; + + NUdf::TStringBlockReader reader; + for (int i = 0; i < 5; i++) { + auto item = reader.GetItem(*data, i); + if (!item) { + UNIT_ASSERT(expected[i] == nullptr); + } else { + const char* addr = item.AsStringRef().Data() + sizeof(void*); + UNIT_ASSERT(expected[i] != nullptr); + UNIT_ASSERT_VALUES_EQUAL( + TString(DatumGetCString(DirectFunctionCall1(numeric_out, (Datum)addr))), + expected[i] + ); + } + } +} + +} // Y_UNIT_TEST_SUITE(TArrowUtilsTests) + +} // namespace NYql + diff --git a/ydb/library/yql/parser/pg_wrapper/ut/ya.make b/ydb/library/yql/parser/pg_wrapper/ut/ya.make index c4a443707204..848e33acfdf0 100644 --- a/ydb/library/yql/parser/pg_wrapper/ut/ya.make +++ b/ydb/library/yql/parser/pg_wrapper/ut/ya.make @@ -8,6 +8,7 @@ NO_COMPILER_WARNINGS() INCLUDE(../cflags.inc) SRCS( + arrow_ut.cpp codegen_ut.cpp error_ut.cpp parser_ut.cpp