Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

s3 arrow converters have been added YQ-2570 #1249

Merged
merged 1 commit into from
Jan 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions ydb/library/yql/providers/s3/actors/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ ADDINCL(
YQL_LAST_ABI_VERSION()

SRCS(
yql_arrow_column_converters.cpp
yql_s3_actors_util.cpp
yql_s3_applicator_actor.cpp
yql_s3_sink_factory.cpp
Expand Down
211 changes: 211 additions & 0 deletions ydb/library/yql/providers/s3/actors/yql_arrow_column_converters.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,211 @@
#include "yql_arrow_column_converters.h"

#include <contrib/libs/apache/arrow/cpp/src/arrow/array/array_binary.h>
#include <contrib/libs/apache/arrow/cpp/src/arrow/compute/cast.h>
#include <contrib/libs/apache/arrow/cpp/src/parquet/exception.h>

#include <ydb/library/yql/minikql/mkql_node.h>
#include <ydb/library/yql/minikql/mkql_node_builder.h>
#include <ydb/library/yql/minikql/mkql_node_cast.h>
#include <ydb/library/yql/minikql/mkql_type_builder.h>
#include <ydb/library/yql/parser/pg_catalog/catalog.h>
#include <ydb/library/yql/public/udf/arrow/block_builder.h>
#include <ydb/library/yql/public/udf/arrow/block_item.h>
#include <ydb/library/yql/public/udf/arrow/block_reader.h>
#include <ydb/library/yql/utils/yql_panic.h>

#ifdef THROW
#undef THROW
#endif

#pragma clang diagnostic push
#pragma clang diagnostic ignored "-Wunused-parameter"

#include <ydb/library/yql/udfs/common/clickhouse/client/src/Functions/FunctionsConversion.h>

#pragma clang diagnostic pop

namespace {

#define THROW_ARROW_NOT_OK(status) \
do \
{ \
if (::arrow::Status _s = (status); !_s.ok()) \
throw yexception() << _s.ToString(); \
} while (false)

using namespace NYql;
using namespace NKikimr::NMiniKQL;

template <bool isOptional>
std::shared_ptr<arrow::Array> ArrowDate32AsYqlDate(const std::shared_ptr<arrow::DataType>& targetType, const std::shared_ptr<arrow::Array>& value) {
::NYql::NUdf::TFixedSizeArrayBuilder<ui16, isOptional> builder(NKikimr::NMiniKQL::TTypeInfoHelper(), targetType, *arrow::system_memory_pool(), value->length());
::NYql::NUdf::TFixedSizeBlockReader<i32, isOptional> reader;
for (i64 i = 0; i < value->length(); ++i) {
const NUdf::TBlockItem item = reader.GetItem(*value->data(), i);
if constexpr (isOptional) {
if (!item) {
builder.Add(item);
continue;
}
} else if (!item) {
throw parquet::ParquetException(TStringBuilder() << "null value for date could not be represented in non-optional type");
}

const i32 v = item.As<i32>();
if (v < 0 || v > ::NYql::NUdf::MAX_DATE) {
throw parquet::ParquetException(TStringBuilder() << "date in parquet is out of range [0, " << ::NYql::NUdf::MAX_DATE << "]: " << v);
}
builder.Add(NUdf::TBlockItem(static_cast<ui16>(v)));
}
return builder.Build(true).make_array();
}

TColumnConverter ArrowDate32AsYqlDate(const std::shared_ptr<arrow::DataType>& targetType, bool isOptional) {
return [targetType, isOptional](const std::shared_ptr<arrow::Array>& value) {
return isOptional
? ArrowDate32AsYqlDate<true>(targetType, value)
: ArrowDate32AsYqlDate<false>(targetType, value);
};
}

template <bool isOptional>
std::shared_ptr<arrow::Array> ArrowStringAsYqlDateTime(const std::shared_ptr<arrow::DataType>& targetType, const std::shared_ptr<arrow::Array>& value, const NDB::FormatSettings& formatSettings) {
::NYql::NUdf::TFixedSizeArrayBuilder<ui32, isOptional> builder(NKikimr::NMiniKQL::TTypeInfoHelper(), targetType, *arrow::system_memory_pool(), value->length());
::NYql::NUdf::TStringBlockReader<arrow::BinaryType, isOptional, NKikimr::NUdf::EDataSlot::String> reader;
for (i64 i = 0; i < value->length(); ++i) {
NUdf::TBlockItem item = reader.GetItem(*value->data(), i);

if constexpr (isOptional) {
if (!item) {
builder.Add(item);
continue;
}
} else if (!item) {
throw parquet::ParquetException(TStringBuilder() << "null value for date could not be represented in non-optional type");
}

auto ref = item.AsStringRef();
NDB::ReadBufferFromMemory rb{ref.Data(), ref.Size()};
uint32_t result = 0;
parseImpl<NDB::DataTypeDateTime>(result, rb, nullptr, formatSettings);
builder.Add(NUdf::TBlockItem(static_cast<ui32>(result)));
}
return builder.Build(true).make_array();
}

TColumnConverter ArrowStringAsYqlDateTime(const std::shared_ptr<arrow::DataType>& targetType, bool isOptional, const NDB::FormatSettings& formatSettings) {
return [targetType, isOptional, formatSettings](const std::shared_ptr<arrow::Array>& value) {
return isOptional
? ArrowStringAsYqlDateTime<true>(targetType, value, formatSettings)
: ArrowStringAsYqlDateTime<false>(targetType, value, formatSettings);
};
}

template <bool isOptional>
std::shared_ptr<arrow::Array> ArrowStringAsYqlTimestamp(const std::shared_ptr<arrow::DataType>& targetType, const std::shared_ptr<arrow::Array>& value, const NDB::FormatSettings& formatSettings) {
::NYql::NUdf::TFixedSizeArrayBuilder<ui64, isOptional> builder(NKikimr::NMiniKQL::TTypeInfoHelper(), targetType, *arrow::system_memory_pool(), value->length());
::NYql::NUdf::TStringBlockReader<arrow::BinaryType, isOptional, NKikimr::NUdf::EDataSlot::String> reader;
for (i64 i = 0; i < value->length(); ++i) {
NUdf::TBlockItem item = reader.GetItem(*value->data(), i);

if constexpr (isOptional) {
if (!item) {
builder.Add(item);
continue;
}
} else if (!item) {
throw parquet::ParquetException(TStringBuilder() << "null value for date could not be represented in non-optional type");
}

auto ref = item.AsStringRef();
NDB::ReadBufferFromMemory rb{ref.Data(), ref.Size()};
NDB::DateTime64 result = 0;
readTextTimestamp64(result, 0, rb, DateLUT::instance(), formatSettings);
builder.Add(NUdf::TBlockItem(static_cast<ui64>(result)));
}
return builder.Build(true).make_array();
}

TColumnConverter ArrowStringAsYqlTimestamp(const std::shared_ptr<arrow::DataType>& targetType, bool isOptional, const NDB::FormatSettings& formatSettings) {
return [targetType, isOptional, formatSettings](const std::shared_ptr<arrow::Array>& value) {
return isOptional
? ArrowStringAsYqlTimestamp<true>(targetType, value, formatSettings)
: ArrowStringAsYqlTimestamp<false>(targetType, value, formatSettings);
};
}

TColumnConverter BuildCustomConverter(const std::shared_ptr<arrow::DataType>& originalType, const std::shared_ptr<arrow::DataType>& targetType, TType* yqlType, const NDB::FormatSettings& formatSettings) {
// TODO: support more than 1 optional level
bool isOptional = false;
auto unpackedYqlType = UnpackOptional(yqlType, isOptional);
if (!unpackedYqlType->IsData()) {
return {};
}

auto slot = AS_TYPE(TDataType, unpackedYqlType)->GetDataSlot();
if (!slot) {
return {};
}

auto slotItem = *slot;
switch (originalType->id()) {
case arrow::Type::DATE32:
switch (slotItem) {
case NUdf::EDataSlot::Date:
return ArrowDate32AsYqlDate(targetType, isOptional);
default:
return {};
}
return {};
case arrow::Type::BINARY:
switch (slotItem) {
case NUdf::EDataSlot::Datetime:
return ArrowStringAsYqlDateTime(targetType, isOptional, formatSettings);
case NUdf::EDataSlot::Timestamp:
return ArrowStringAsYqlTimestamp(targetType, isOptional, formatSettings);
default:
return {};
}
return {};
default:
return {};
}
}

}

namespace NYql::NDq {

TColumnConverter BuildColumnConverter(const std::string& columnName, const std::shared_ptr<arrow::DataType>& originalType, const std::shared_ptr<arrow::DataType>& targetType, TType* yqlType, const NDB::FormatSettings& formatSettings) {
if (yqlType->IsPg()) {
auto pgType = AS_TYPE(TPgType, yqlType);
auto conv = BuildPgColumnConverter(originalType, pgType);
if (!conv) {
ythrow yexception() << "Arrow type: " << originalType->ToString() <<
" of field: " << columnName << " isn't compatible to PG type: " << NPg::LookupType(pgType->GetTypeId()).Name;
}

return conv;
}

if (auto customConverter = BuildCustomConverter(originalType, targetType, yqlType, formatSettings); customConverter) {
return customConverter;
}

if (targetType->Equals(originalType)) {
return {};
}

YQL_ENSURE(arrow::compute::CanCast(*originalType, *targetType), "Mismatch type for field: " << columnName << ", expected: "
<< targetType->ToString() << ", got: " << originalType->ToString());


return [targetType](const std::shared_ptr<arrow::Array>& value) {
auto res = arrow::compute::Cast(*value, targetType);
THROW_ARROW_NOT_OK(res.status());
return std::move(res).ValueOrDie();
};
}

} // namespace NYql::NDq
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
#pragma once

#include <ydb/library/yql/parser/pg_wrapper/interface/arrow.h>
#include <ydb/library/yql/udfs/common/clickhouse/client/src/Formats/FormatSettings.h>

namespace NYql::NDq {

TColumnConverter BuildColumnConverter(
const std::string& columnName,
const std::shared_ptr<arrow::DataType>& originalType,
const std::shared_ptr<arrow::DataType>& targetType,
NKikimr::NMiniKQL::TType* yqlType,
const NDB::FormatSettings& formatSettings);

} // namespace NYql::NDq
75 changes: 3 additions & 72 deletions ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,10 @@

#endif

#include "yql_arrow_column_converters.h"
#include "yql_s3_actors_util.h"
#include "yql_s3_read_actor.h"
#include "yql_s3_source_factory.h"
#include "yql_s3_actors_util.h"

#include <ydb/library/services/services.pb.h>

Expand Down Expand Up @@ -1122,76 +1123,6 @@ void DownloadStart(const TRetryStuff::TPtr& retryStuff, TActorSystem* actorSyste
inflightCounter);
}

template <bool isOptional>
std::shared_ptr<arrow::Array> ArrowDate32AsYqlDate(const std::shared_ptr<arrow::DataType>& targetType, const std::shared_ptr<arrow::Array>& value) {
::NYql::NUdf::TFixedSizeArrayBuilder<ui16, isOptional> builder(NKikimr::NMiniKQL::TTypeInfoHelper(), targetType, *arrow::system_memory_pool(), value->length());
::NYql::NUdf::TFixedSizeBlockReader<i32, isOptional> reader;
for (i64 i = 0; i < value->length(); ++i) {
const NUdf::TBlockItem item = reader.GetItem(*value->data(), i);
if constexpr (isOptional) {
if (!item) {
builder.Add(item);
continue;
}
} else if (!item) {
throw parquet::ParquetException(TStringBuilder() << "null value for date could not be represented in non-optional type");
}

const i32 v = item.As<i32>();
if (v < 0 || v > ::NYql::NUdf::MAX_DATE) {
throw parquet::ParquetException(TStringBuilder() << "date in parquet is out of range [0, " << ::NYql::NUdf::MAX_DATE << "]: " << v);
}
builder.Add(NUdf::TBlockItem(static_cast<ui16>(v)));
}
return builder.Build(true).make_array();
}

TColumnConverter BuildColumnConverter(const std::string& columnName, const std::shared_ptr<arrow::DataType>& originalType, const std::shared_ptr<arrow::DataType>& targetType, TType* yqlType) {
if (yqlType->IsPg()) {
auto pgType = AS_TYPE(TPgType, yqlType);
auto conv = BuildPgColumnConverter(originalType, pgType);
if (!conv) {
ythrow yexception() << "Arrow type: " << originalType->ToString() <<
" of field: " << columnName << " isn't compatible to PG type: " << NPg::LookupType(pgType->GetTypeId()).Name;
}

return conv;
}

if (originalType->id() == arrow::Type::DATE32) {
// TODO: support more than 1 optional level
bool isOptional = false;
auto unpackedYqlType = UnpackOptional(yqlType, isOptional);

// arrow date -> yql date
if (unpackedYqlType->IsData()) {
auto slot = AS_TYPE(TDataType, unpackedYqlType)->GetDataSlot();
if (slot == NUdf::EDataSlot::Date) {
return [targetType, isOptional](const std::shared_ptr<arrow::Array>& value) {
if (isOptional) {
return ArrowDate32AsYqlDate<true>(targetType, value);
}
return ArrowDate32AsYqlDate<false>(targetType, value);
};
}
}
}

if (targetType->Equals(originalType)) {
return {};
}

YQL_ENSURE(arrow::compute::CanCast(*originalType, *targetType), "Mismatch type for field: " << columnName << ", expected: "
<< targetType->ToString() << ", got: " << originalType->ToString());


return [targetType](const std::shared_ptr<arrow::Array>& value) {
auto res = arrow::compute::Cast(*value, targetType);
THROW_ARROW_NOT_OK(res.status());
return std::move(res).ValueOrDie();
};
}

std::shared_ptr<arrow::RecordBatch> ConvertArrowColumns(std::shared_ptr<arrow::RecordBatch> batch, std::vector<TColumnConverter>& columnConverters) {
auto columns = batch->columns();
for (size_t i = 0; i < columnConverters.size(); ++i) {
Expand Down Expand Up @@ -1571,7 +1502,7 @@ class TS3ReadCoroImpl : public TActorCoroImpl {
columnIndices.push_back(srcFieldIndex);
auto rowSpecColumnIt = ReadSpec->RowSpec.find(targetField->name());
YQL_ENSURE(rowSpecColumnIt != ReadSpec->RowSpec.end(), "Column " << targetField->name() << " not found in row spec");
columnConverters.emplace_back(BuildColumnConverter(targetField->name(), originalType, targetType, rowSpecColumnIt->second));
columnConverters.emplace_back(BuildColumnConverter(targetField->name(), originalType, targetType, rowSpecColumnIt->second, ReadSpec->Settings));
}
}

Expand Down
16 changes: 11 additions & 5 deletions ydb/tests/fq/s3/test_format_setting.py
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,7 @@ def create_sink_date_time_binding(self, client, connection_id, prefix, type_form
("timestamp/simple_iso/test.csv", "csv_with_names"),
("timestamp/simple_iso/test.tsv", "tsv_with_names"),
("timestamp/simple_iso/test.json", "json_each_row"),
("timestamp/simple_iso/test.parquet", "parquet")
])
def test_timestamp_simple_iso(self, kikimr, s3, client, filename, type_format):
self.create_bucket_and_upload_file(filename, s3, kikimr)
Expand Down Expand Up @@ -357,6 +358,7 @@ def test_timestamp_simple_iso_insert(self, kikimr, s3, client, filename, type_fo
("common/simple_posix/test.csv", "csv_with_names"),
("common/simple_posix/test.tsv", "tsv_with_names"),
("common/simple_posix/test.json", "json_each_row"),
("common/simple_posix/test.parquet", "parquet")
])
def test_timestamp_simple_posix(self, kikimr, s3, client, filename, type_format):
self.create_bucket_and_upload_file(filename, s3, kikimr)
Expand All @@ -380,7 +382,7 @@ def test_timestamp_simple_posix(self, kikimr, s3, client, filename, type_format)
@pytest.mark.parametrize("filename, type_format", [
("common/simple_posix/test.csv", "csv_with_names"),
("common/simple_posix/test.tsv", "tsv_with_names"),
("common/simple_posix/test.json", "json_each_row"),
("common/simple_posix/test.json", "json_each_row")
])
def test_timestamp_simple_posix_insert(self, kikimr, s3, client, filename, type_format):
self.create_bucket_and_upload_file(filename, s3, kikimr)
Expand All @@ -404,7 +406,8 @@ def test_timestamp_simple_posix_insert(self, kikimr, s3, client, filename, type_
@pytest.mark.parametrize("filename, type_format", [
("date_time/simple_iso/test.csv", "csv_with_names"),
("date_time/simple_iso/test.tsv", "tsv_with_names"),
("date_time/simple_iso/test.json", "json_each_row")
("date_time/simple_iso/test.json", "json_each_row"),
("date_time/simple_iso/test.parquet", "parquet")
])
def test_date_time_simple_iso(self, kikimr, s3, client, filename, type_format):
self.create_bucket_and_upload_file(filename, s3, kikimr)
Expand Down Expand Up @@ -452,7 +455,8 @@ def test_date_time_simple_iso_insert(self, kikimr, s3, client, filename, type_fo
@pytest.mark.parametrize("filename, type_format", [
("common/simple_posix/test.csv", "csv_with_names"),
("common/simple_posix/test.tsv", "tsv_with_names"),
("common/simple_posix/test.json", "json_each_row")
("common/simple_posix/test.json", "json_each_row"),
("common/simple_posix/test.parquet", "parquet")
])
def test_date_time_simple_posix(self, kikimr, s3, client, filename, type_format):
self.create_bucket_and_upload_file(filename, s3, kikimr)
Expand All @@ -476,7 +480,8 @@ def test_date_time_simple_posix(self, kikimr, s3, client, filename, type_format)
@pytest.mark.parametrize("filename, type_format", [
("common/simple_posix/test.csv", "csv_with_names"),
("common/simple_posix/test.tsv", "tsv_with_names"),
("common/simple_posix/test.json", "json_each_row")
("common/simple_posix/test.json", "json_each_row"),
("common/simple_posix/test.parquet", "parquet")
])
def test_date_time_simple_posix_insert(self, kikimr, s3, client, filename, type_format):
self.create_bucket_and_upload_file(filename, s3, kikimr)
Expand Down Expand Up @@ -549,7 +554,8 @@ def test_timestamp_simple_format_insert(self, kikimr, s3, client, filename, type
@pytest.mark.parametrize("filename, type_format", [
("common/simple_format/test.csv", "csv_with_names"),
("common/simple_format/test.tsv", "tsv_with_names"),
("common/simple_format/test.json", "json_each_row")
("common/simple_format/test.json", "json_each_row"),
("common/simple_format/test.parquet", "parquet")
])
def test_date_time_simple_format_insert(self, kikimr, s3, client, filename, type_format):
self.create_bucket_and_upload_file(filename, s3, kikimr)
Expand Down
Loading