Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cell maker lib & parsing from json KIKIMR-20673 #1255

Merged
merged 1 commit into from
Jan 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ydb/core/grpc_services/rpc_import_data.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
#include <ydb/core/tx/datashard/datashard.h>
#include <ydb/core/tx/tx_proxy/proxy.h>
#include <ydb/core/tx/scheme_cache/scheme_cache.h>
#include <ydb/core/io_formats/csv.h>
#include <ydb/core/io_formats/ydb_dump/csv_ydb_dump.h>

#include <ydb/library/actors/core/hfunc.h>

Expand Down
2 changes: 1 addition & 1 deletion ydb/core/grpc_services/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ PEERDIR(
ydb/core/grpc_services/cancelation
ydb/core/grpc_services/auth_processor
ydb/core/health_check
ydb/core/io_formats
ydb/core/io_formats/ydb_dump
ydb/core/kesus/tablet
ydb/core/kqp/common
ydb/core/protos
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
#include "csv.h"
#include "csv_arrow.h"

#include <ydb/core/formats/arrow/arrow_helpers.h>
#include <ydb/core/formats/arrow/serializer/stream.h>

#include <contrib/libs/apache/arrow/cpp/src/arrow/record_batch.h>
#include <contrib/libs/apache/arrow/cpp/src/arrow/util/value_parsing.h>

Expand Down
Original file line number Diff line number Diff line change
@@ -1,22 +1,12 @@
#pragma once

#include <ydb/core/scheme/scheme_tablecell.h>
#include <ydb/public/lib/scheme_types/scheme_type_id.h>

#include <util/generic/strbuf.h>
#include <util/memory/pool.h>
#include <ydb/core/scheme_types/scheme_type_info.h>

#include <contrib/libs/apache/arrow/cpp/src/arrow/csv/api.h>
#include <contrib/libs/apache/arrow/cpp/src/arrow/io/api.h>

namespace NKikimr::NFormats {

struct TYdbDump {
// Parse YdbDump-formatted line
static bool ParseLine(TStringBuf line, const std::vector<std::pair<i32, NScheme::TTypeInfo>>& columnOrderTypes, TMemoryPool& pool,
TVector<TCell>& keys, TVector<TCell>& values, TString& strError, ui64& numBytes);
};

class TArrowCSV {
public:
static constexpr ui32 DEFAULT_BLOCK_SIZE = 1024 * 1024;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#include "csv.h"
#include "csv_arrow.h"

#include <ydb/core/formats/arrow/arrow_helpers.h>
#include <library/cpp/testing/unittest/registar.h>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
UNITTEST_FOR(ydb/core/io_formats)
UNITTEST_FOR(ydb/core/io_formats/arrow)

SIZE(SMALL)

PEERDIR(
ydb/core/io_formats
ydb/core/io_formats/arrow

# for NYql::NUdf alloc stuff used in binary_json
ydb/library/yql/public/udf/service/exception_policy
Expand All @@ -13,7 +13,7 @@ PEERDIR(
YQL_LAST_ABI_VERSION()

SRCS(
ut_csv.cpp
csv_arrow_ut.cpp
)

END()
20 changes: 20 additions & 0 deletions ydb/core/io_formats/arrow/ya.make
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
RECURSE_FOR_TESTS(ut)

LIBRARY()

SRCS(
csv_arrow.cpp
)

CFLAGS(
-Wno-unused-parameter
)

PEERDIR(
ydb/core/scheme_types
ydb/core/formats/arrow
)

YQL_LAST_ABI_VERSION()

END()
Original file line number Diff line number Diff line change
@@ -1,18 +1,20 @@
#include "csv.h"

#include <contrib/libs/double-conversion/double-conversion/double-conversion.h>
#include "cell_maker.h"

#include <ydb/library/binary_json/write.h>
#include <ydb/library/dynumber/dynumber.h>

#include <library/cpp/string_utils/quote/quote.h>

#include <ydb/library/yql/minikql/dom/yson.h>
#include <ydb/library/yql/minikql/dom/json.h>
#include <ydb/library/yql/public/udf/udf_types.h>
#include <ydb/library/yql/public/decimal/yql_decimal.h>
#include <ydb/library/yql/public/udf/udf_types.h>
#include <ydb/library/yql/utils/utf8.h>

#include <contrib/libs/double-conversion/double-conversion/double-conversion.h>
#include <library/cpp/json/json_writer.h>
#include <library/cpp/json/yson/json2yson.h>
#include <library/cpp/string_utils/base64/base64.h>
#include <library/cpp/string_utils/quote/quote.h>

#include <util/datetime/base.h>
#include <util/string/cast.h>

Expand Down Expand Up @@ -178,6 +180,15 @@ namespace {
return false;
}

return Conv(c, t, pool, conv);
}

static bool MakeDirect(TCell& c, const T& v, TMemoryPool& pool, TString&, TConverter<T, U> conv = &Implicit<T, U>) {
return Conv(c, v, pool, conv);
}

private:
static bool Conv(TCell& c, const T& t, TMemoryPool& pool, TConverter<T, U> conv) {
auto& u = *pool.Allocate<U>();
u = conv(t);
c = TCell(reinterpret_cast<const char*>(&u), sizeof(u));
Expand All @@ -195,10 +206,11 @@ namespace {
return false;
}

const auto u = pool.AppendString(conv(t));
c = TCell(u.data(), u.size());
return Conv(c, t, pool, conv);
}

return true;
static bool MakeDirect(TCell& c, const T& v, TMemoryPool& pool, TString&, TConverter<T, TStringBuf> conv = &Implicit<T, TStringBuf>) {
return Conv(c, v, pool, conv);
}

static bool Make(TCell& c, TStringBuf v, TMemoryPool& pool, TString& err, TConverter<T, TStringBuf> conv, void* parseParam) {
Expand All @@ -207,13 +219,31 @@ namespace {
return false;
}

return Conv(c, t, pool, conv);
}

private:
static bool Conv(TCell& c, const T& t, TMemoryPool& pool, TConverter<T, TStringBuf> conv) {
const auto u = pool.AppendString(conv(t));
c = TCell(u.data(), u.size());

return true;
}
};

NJson::TJsonWriterConfig DefaultJsonConfig() {
NJson::TJsonWriterConfig jsonConfig;
jsonConfig.ValidateUtf8 = false;
jsonConfig.WriteNanAsString = true;
return jsonConfig;
}

TString WriteJson(const NJson::TJsonValue& json) {
TStringStream str;
NJson::WriteJson(&str, &json, DefaultJsonConfig());
return str.Str();
}

} // anonymous

bool MakeCell(TCell& cell, TStringBuf value, NScheme::TTypeInfo type, TMemoryPool& pool, TString& err) {
Expand Down Expand Up @@ -272,6 +302,71 @@ bool MakeCell(TCell& cell, TStringBuf value, NScheme::TTypeInfo type, TMemoryPoo
}
}

bool MakeCell(TCell& cell, const NJson::TJsonValue& value, NScheme::TTypeInfo type, TMemoryPool& pool, TString& err) {
if (value.IsNull()) {
return true;
}

try {
switch (type.GetTypeId()) {
case NScheme::NTypeIds::Bool:
return TCellMaker<bool>::MakeDirect(cell, value.GetBooleanSafe(), pool, err);
case NScheme::NTypeIds::Int8:
return TCellMaker<i8>::MakeDirect(cell, value.GetIntegerSafe(), pool, err);
case NScheme::NTypeIds::Uint8:
return TCellMaker<ui8>::MakeDirect(cell, value.GetUIntegerSafe(), pool, err);
case NScheme::NTypeIds::Int16:
return TCellMaker<i16>::MakeDirect(cell, value.GetIntegerSafe(), pool, err);
case NScheme::NTypeIds::Uint16:
return TCellMaker<ui16>::MakeDirect(cell, value.GetUIntegerSafe(), pool, err);
case NScheme::NTypeIds::Int32:
return TCellMaker<i32>::MakeDirect(cell, value.GetIntegerSafe(), pool, err);
case NScheme::NTypeIds::Uint32:
return TCellMaker<ui32>::MakeDirect(cell, value.GetUIntegerSafe(), pool, err);
case NScheme::NTypeIds::Int64:
return TCellMaker<i64>::MakeDirect(cell, value.GetIntegerSafe(), pool, err);
case NScheme::NTypeIds::Uint64:
return TCellMaker<ui64>::MakeDirect(cell, value.GetUIntegerSafe(), pool, err);
case NScheme::NTypeIds::Float:
return TCellMaker<float>::MakeDirect(cell, value.GetDoubleSafe(), pool, err);
case NScheme::NTypeIds::Double:
return TCellMaker<double>::MakeDirect(cell, value.GetDoubleSafe(), pool, err);
case NScheme::NTypeIds::Date:
return TCellMaker<TInstant, ui16>::Make(cell, value.GetStringSafe(), pool, err, &Days);
case NScheme::NTypeIds::Datetime:
return TCellMaker<TInstant, ui32>::Make(cell, value.GetStringSafe(), pool, err, &Seconds);
case NScheme::NTypeIds::Timestamp:
return TCellMaker<TInstant, ui64>::Make(cell, value.GetStringSafe(), pool, err, &MicroSeconds);
case NScheme::NTypeIds::Interval:
return TCellMaker<i64>::MakeDirect(cell, value.GetIntegerSafe(), pool, err);
case NScheme::NTypeIds::String:
case NScheme::NTypeIds::String4k:
case NScheme::NTypeIds::String2m:
return TCellMaker<TString, TStringBuf>::MakeDirect(cell, Base64Decode(value.GetStringSafe()), pool, err);
case NScheme::NTypeIds::Utf8:
return TCellMaker<TString, TStringBuf>::MakeDirect(cell, value.GetStringSafe(), pool, err);
case NScheme::NTypeIds::Yson:
return TCellMaker<TString, TStringBuf>::MakeDirect(cell, NJson2Yson::SerializeJsonValueAsYson(value), pool, err);
case NScheme::NTypeIds::Json:
return TCellMaker<TString, TStringBuf>::MakeDirect(cell, NFormats::WriteJson(value), pool, err);
case NScheme::NTypeIds::JsonDocument:
if (const auto& result = NBinaryJson::SerializeToBinaryJson(NFormats::WriteJson(value))) {
return TCellMaker<TMaybe<NBinaryJson::TBinaryJson>, TStringBuf>::MakeDirect(cell, result, pool, err, &BinaryJsonToStringBuf);
} else {
return false;
}
case NScheme::NTypeIds::DyNumber:
return TCellMaker<TMaybe<TString>, TStringBuf>::Make(cell, value.GetStringSafe(), pool, err, &DyNumberToStringBuf);
case NScheme::NTypeIds::Decimal:
return TCellMaker<NYql::NDecimal::TInt128, std::pair<ui64, ui64>>::Make(cell, value.GetStringSafe(), pool, err, &Int128ToPair);
default:
return false;
}
} catch (const yexception&) {
return false;
}
}

bool CheckCellValue(const TCell& cell, NScheme::TTypeInfo type) {
if (cell.IsNull()) {
return true;
Expand Down Expand Up @@ -317,45 +412,4 @@ bool CheckCellValue(const TCell& cell, NScheme::TTypeInfo type) {
}
}

bool TYdbDump::ParseLine(TStringBuf line, const std::vector<std::pair<i32, NScheme::TTypeInfo>>& columnOrderTypes, TMemoryPool& pool,
TVector<TCell>& keys, TVector<TCell>& values, TString& strError, ui64& numBytes)
{
for (const auto& [keyOrder, pType] : columnOrderTypes) {
TStringBuf value = line.NextTok(',');
if (!value) {
strError = "Empty token";
return false;
}

TCell* cell = nullptr;

if (keyOrder != -1) {
if ((int)keys.size() < (keyOrder + 1)) {
keys.resize(keyOrder + 1);
}

cell = &keys.at(keyOrder);
} else {
cell = &values.emplace_back();
}

Y_ABORT_UNLESS(cell);

TString parseError;
if (!MakeCell(*cell, value, pType, pool, parseError)) {
strError = TStringBuilder() << "Value parse error: '" << value << "' " << parseError;
return false;
}

if (!CheckCellValue(*cell, pType)) {
strError = TStringBuilder() << "Value check error: '" << value << "'";
return false;
}

numBytes += cell->Size();
}

return true;
}

}
18 changes: 18 additions & 0 deletions ydb/core/io_formats/cell_maker/cell_maker.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
#pragma once

#include <ydb/core/scheme/scheme_tablecell.h>
#include <ydb/core/scheme_types/scheme_type_info.h>

#include <library/cpp/json/json_value.h>

#include <util/generic/strbuf.h>
#include <util/generic/string.h>
#include <util/memory/pool.h>

namespace NKikimr::NFormats {

bool MakeCell(TCell& cell, TStringBuf value, NScheme::TTypeInfo type, TMemoryPool& pool, TString& err);
bool MakeCell(TCell& cell, const NJson::TJsonValue& value, NScheme::TTypeInfo type, TMemoryPool& pool, TString& err);
bool CheckCellValue(const TCell& cell, NScheme::TTypeInfo type);

}
25 changes: 25 additions & 0 deletions ydb/core/io_formats/cell_maker/ya.make
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
LIBRARY()

SRCS(
cell_maker.cpp
)

PEERDIR(
ydb/core/scheme
ydb/core/scheme_types
ydb/library/binary_json
ydb/library/dynumber
ydb/library/yql/minikql/dom
ydb/library/yql/public/decimal
ydb/library/yql/public/udf
ydb/library/yql/utils
contrib/libs/double-conversion
library/cpp/json
library/cpp/json/yson
library/cpp/string_utils/base64
library/cpp/string_utils/quote
)

YQL_LAST_ABI_VERSION()

END()
33 changes: 6 additions & 27 deletions ydb/core/io_formats/ya.make
Original file line number Diff line number Diff line change
@@ -1,30 +1,9 @@
RECURSE_FOR_TESTS(ut)

LIBRARY()

SRCS(
csv_ydb_dump.cpp
csv_arrow.cpp
)

CFLAGS(
-Wno-unused-parameter
RECURSE(
arrow
cell_maker
ydb_dump
)

PEERDIR(
contrib/libs/double-conversion
library/cpp/string_utils/quote
ydb/core/formats/arrow
ydb/core/scheme
ydb/library/binary_json
ydb/library/dynumber
ydb/library/yql/minikql/dom
ydb/library/yql/public/decimal
ydb/library/yql/public/udf
ydb/library/yql/utils
ydb/public/lib/scheme_types
RECURSE_FOR_TESTS(
arrow
)

YQL_LAST_ABI_VERSION()

END()
Loading
Loading