Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merging inference updates #7929

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions ydb/core/external_sources/object_storage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,7 @@ struct TObjectStorageExternalSource : public IExternalSource {
}
for (const auto& entry : entries.Objects) {
if (entry.Size > 0) {
return entry.Path;
return entry;
}
}
throw yexception() << "couldn't find any files for type inference, please check that the right path is provided";
Expand All @@ -349,30 +349,31 @@ struct TObjectStorageExternalSource : public IExternalSource {
meta->Attributes.erase("withinfer");

auto fileFormat = NObjectStorage::NInference::ConvertFileFormat(*format);
auto arrowFetcherId = ActorSystem->Register(NObjectStorage::NInference::CreateArrowFetchingActor(s3FetcherId, fileFormat));
auto arrowFetcherId = ActorSystem->Register(NObjectStorage::NInference::CreateArrowFetchingActor(s3FetcherId, fileFormat, meta->Attributes));
auto arrowInferencinatorId = ActorSystem->Register(NObjectStorage::NInference::CreateArrowInferencinator(arrowFetcherId, fileFormat, meta->Attributes));

return afterListing.Apply([arrowInferencinatorId, meta, actorSystem = ActorSystem](const NThreading::TFuture<TString>& pathFut) {
return afterListing.Apply([arrowInferencinatorId, meta, actorSystem = ActorSystem](const NThreading::TFuture<NYql::NS3Lister::TObjectListEntry>& entryFut) {
auto promise = NThreading::NewPromise<TMetadataResult>();
auto schemaToMetadata = [meta](NThreading::TPromise<TMetadataResult> metaPromise, NObjectStorage::TEvInferredFileSchema&& response) {
if (!response.Status.IsSuccess()) {
metaPromise.SetValue(NYql::NCommon::ResultFromError<TMetadataResult>(response.Status.GetIssues()));
return;
}
TMetadataResult result;
meta->Changed = true;
meta->Schema.clear_column();
for (const auto& column : response.Fields) {
auto& destColumn = *meta->Schema.add_column();
destColumn = column;
}
TMetadataResult result;
result.SetSuccess();
result.Metadata = meta;
metaPromise.SetValue(std::move(result));
};
auto [path, size, _] = entryFut.GetValue();
actorSystem->Register(new NKqp::TActorRequestHandler<NObjectStorage::TEvInferFileSchema, NObjectStorage::TEvInferredFileSchema, TMetadataResult>(
arrowInferencinatorId,
new NObjectStorage::TEvInferFileSchema(TString{pathFut.GetValue()}),
new NObjectStorage::TEvInferFileSchema(TString{path}, size),
promise,
std::move(schemaToMetadata)
));
Expand Down
4 changes: 3 additions & 1 deletion ydb/core/external_sources/object_storage/events.h
Original file line number Diff line number Diff line change
Expand Up @@ -119,11 +119,13 @@ struct TEvArrowFile : public NActors::TEventLocal<TEvArrowFile, EvArrowFile> {
};

struct TEvInferFileSchema : public NActors::TEventLocal<TEvInferFileSchema, EvInferFileSchema> {
explicit TEvInferFileSchema(TString&& path)
explicit TEvInferFileSchema(TString&& path, ui64 size)
: Path{std::move(path)}
, Size{size}
{}

TString Path;
ui64 Size = 0;
};

struct TEvInferredFileSchema : public NActors::TEventLocal<TEvInferredFileSchema, EvInferredFileSchema> {
Expand Down
142 changes: 132 additions & 10 deletions ydb/core/external_sources/object_storage/inference/arrow_fetcher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,24 +6,32 @@
#include <arrow/csv/chunker.h>
#include <arrow/csv/options.h>
#include <arrow/io/memory.h>
#include <arrow/util/endian.h>

#include <util/generic/guid.h>
#include <util/generic/size_literals.h>

#include <ydb/core/external_sources/object_storage/events.h>
#include <ydb/library/actors/core/actor_bootstrapped.h>
#include <ydb/library/actors/core/hfunc.h>
#include <ydb/library/yql/providers/s3/compressors/factory.h>
#include <ydb/library/yql/udfs/common/clickhouse/client/src/IO/ReadBufferFromString.h>

namespace NKikimr::NExternalSource::NObjectStorage::NInference {

class TArrowFileFetcher : public NActors::TActorBootstrapped<TArrowFileFetcher> {
static constexpr uint64_t PrefixSize = 10_MB;
public:
TArrowFileFetcher(NActors::TActorId s3FetcherId, EFileFormat format)
TArrowFileFetcher(NActors::TActorId s3FetcherId, EFileFormat format, const THashMap<TString, TString>& params)
: S3FetcherId_{s3FetcherId}
, Format_{format}
{
Y_ABORT_UNLESS(IsArrowInferredFormat(Format_));

auto decompression = params.FindPtr("compression");
if (decompression) {
DecompressionFormat_ = *decompression;
}
}

void Bootstrap() {
Expand All @@ -40,15 +48,20 @@ class TArrowFileFetcher : public NActors::TActorBootstrapped<TArrowFileFetcher>
const auto& request = *ev->Get();
TRequest localRequest{
.Path = request.Path,
.RequestId = {},
.RequestId = TGUID::Create(),
.Requester = ev->Sender,
.MetadataRequest = false,
};
CreateGuid(&localRequest.RequestId);

switch (Format_) {
case EFileFormat::CsvWithNames:
case EFileFormat::TsvWithNames: {
HandleAsPrefixFile(std::move(localRequest), ctx);
RequestPartialFile(std::move(localRequest), ctx, 0, 10_MB);
break;
}
case EFileFormat::Parquet: {
localRequest.MetadataRequest = true;
RequestPartialFile(std::move(localRequest), ctx, request.Size - 8, request.Size - 4);
break;
}
default: {
Expand All @@ -67,6 +80,15 @@ class TArrowFileFetcher : public NActors::TActorBootstrapped<TArrowFileFetcher>

const auto& request = requestIt->second;

TString data = std::move(response.Data);
if (DecompressionFormat_) {
auto decompressedData = DecompressFile(data, request, ctx);
if (!decompressedData) {
return;
}
data = std::move(*decompressedData);
}

std::shared_ptr<arrow::io::RandomAccessFile> file;
switch (Format_) {
case EFileFormat::CsvWithNames:
Expand All @@ -76,7 +98,16 @@ class TArrowFileFetcher : public NActors::TActorBootstrapped<TArrowFileFetcher>
if (Format_ == EFileFormat::TsvWithNames) {
options.delimiter = '\t';
}
file = CleanupCsvFile(response.Data, request, options, ctx);
file = CleanupCsvFile(data, request, options, ctx);
ctx.Send(request.Requester, new TEvArrowFile(std::move(file), request.Path));
break;
}
case EFileFormat::Parquet: {
if (request.MetadataRequest) {
HandleMetadataSizeRequest(data, request, ctx);
return;
}
file = BuildParquetFileFromMetadata(data, request, ctx);
ctx.Send(request.Requester, new TEvArrowFile(std::move(file), request.Path));
break;
}
Expand Down Expand Up @@ -104,14 +135,15 @@ class TArrowFileFetcher : public NActors::TActorBootstrapped<TArrowFileFetcher>
uint64_t From = 0;
uint64_t To = 0;
NActors::TActorId Requester;
bool MetadataRequest;
};

// Reading file

void HandleAsPrefixFile(TRequest&& insertedRequest, const NActors::TActorContext& ctx) {
void RequestPartialFile(TRequest&& insertedRequest, const NActors::TActorContext& ctx, uint64_t from, uint64_t to) {
auto path = insertedRequest.Path;
insertedRequest.From = 0;
insertedRequest.To = 10_MB;
insertedRequest.From = from;
insertedRequest.To = to;
auto it = InflightRequests_.try_emplace(path, std::move(insertedRequest));
Y_ABORT_UNLESS(it.second, "couldn't insert request for path: %s", insertedRequest.RequestId.AsGuidString().c_str());

Expand All @@ -135,6 +167,43 @@ class TArrowFileFetcher : public NActors::TActorBootstrapped<TArrowFileFetcher>

// Cutting file

TMaybe<TString> DecompressFile(const TString& data, const TRequest& request, const NActors::TActorContext& ctx) {
try {
NDB::ReadBufferFromString dataBuffer(data);
auto decompressorBuffer = NYql::MakeDecompressor(dataBuffer, *DecompressionFormat_);
if (!decompressorBuffer) {
auto error = MakeError(
request.Path,
NFq::TIssuesIds::INTERNAL_ERROR,
TStringBuilder{} << "unknown compression: " << *DecompressionFormat_ << ". Use one of: gzip, zstd, lz4, brotli, bzip2, xz"
);
SendError(ctx, error);
return {};
}

TStringBuilder decompressedData;
while (!decompressorBuffer->eof() && decompressedData.size() < 10_MB) {
decompressorBuffer->nextIfAtEnd();
size_t maxDecompressedChunkSize = std::min(
decompressorBuffer->available(),
10_MB - decompressedData.size()
);
TString decompressedChunk{maxDecompressedChunkSize, ' '};
decompressorBuffer->read(&decompressedChunk.front(), maxDecompressedChunkSize);
decompressedData << decompressedChunk;
}
return std::move(decompressedData);
} catch (const yexception& error) {
auto errorEv = MakeError(
request.Path,
NFq::TIssuesIds::INTERNAL_ERROR,
TStringBuilder{} << "couldn't decompress file, check compression params: " << error.what()
);
SendError(ctx, errorEv);
return {};
}
}

std::shared_ptr<arrow::io::RandomAccessFile> CleanupCsvFile(const TString& data, const TRequest& request, const arrow::csv::ParseOptions& options, const NActors::TActorContext& ctx) {
auto chunker = arrow::csv::MakeChunker(options);
std::shared_ptr<arrow::Buffer> whole, partial;
Expand Down Expand Up @@ -170,6 +239,58 @@ class TArrowFileFetcher : public NActors::TActorBootstrapped<TArrowFileFetcher>
return std::make_shared<arrow::io::BufferReader>(std::move(whole));
}

void HandleMetadataSizeRequest(const TString& data, TRequest request, const NActors::TActorContext& ctx) {
uint32_t metadataSize = arrow::BitUtil::FromLittleEndian<uint32_t>(ReadUnaligned<uint32_t>(data.data()));

if (metadataSize > 10_MB) {
auto error = MakeError(
request.Path,
NFq::TIssuesIds::INTERNAL_ERROR,
TStringBuilder{} << "couldn't load parquet metadata, size is bigger than 10MB : " << metadataSize
);
SendError(ctx, error);
return;
}

InflightRequests_.erase(request.Path);

TRequest localRequest{
.Path = request.Path,
.RequestId = TGUID::Create(),
.Requester = request.Requester,
.MetadataRequest = false,
};
RequestPartialFile(std::move(localRequest), ctx, request.From - metadataSize, request.To + 4);
}

std::shared_ptr<arrow::io::RandomAccessFile> BuildParquetFileFromMetadata(const TString& data, const TRequest& request, const NActors::TActorContext& ctx) {
auto arrowData = std::make_shared<arrow::Buffer>(nullptr, 0);
arrow::BufferBuilder builder;
auto buildRes = builder.Append(data.data(), data.size());
if (!buildRes.ok()) {
auto error = MakeError(
request.Path,
NFq::TIssuesIds::INTERNAL_ERROR,
TStringBuilder{} << "couldn't read data from S3Fetcher: " << buildRes.ToString()
);
SendError(ctx, error);
return nullptr;
}

buildRes = builder.Finish(&arrowData);
if (!buildRes.ok()) {
auto error = MakeError(
request.Path,
NFq::TIssuesIds::INTERNAL_ERROR,
TStringBuilder{} << "couldn't copy data from S3Fetcher: " << buildRes.ToString()
);
SendError(ctx, error);
return nullptr;
}

return std::make_shared<arrow::io::BufferReader>(std::move(arrowData));
}

// Utility
void SendError(const NActors::TActorContext& ctx, TEvFileError* error) {
auto requestIt = InflightRequests_.find(error->Path);
Expand All @@ -183,10 +304,11 @@ class TArrowFileFetcher : public NActors::TActorBootstrapped<TArrowFileFetcher>
// Fields
NActors::TActorId S3FetcherId_;
EFileFormat Format_;
TMaybe<TString> DecompressionFormat_;
std::unordered_map<TString, TRequest> InflightRequests_; // Path -> Request
};

NActors::IActor* CreateArrowFetchingActor(NActors::TActorId s3FetcherId, EFileFormat format) {
return new TArrowFileFetcher{s3FetcherId, format};
NActors::IActor* CreateArrowFetchingActor(NActors::TActorId s3FetcherId, EFileFormat format, const THashMap<TString, TString>& params) {
return new TArrowFileFetcher{s3FetcherId, format, params};
}
} // namespace NKikimr::NExternalSource::NObjectStorage::NInference
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,5 @@

namespace NKikimr::NExternalSource::NObjectStorage::NInference {

NActors::IActor* CreateArrowFetchingActor(NActors::TActorId s3FetcherId, EFileFormat format);
NActors::IActor* CreateArrowFetchingActor(NActors::TActorId s3FetcherId, EFileFormat format, const THashMap<TString, TString>& params);
} // namespace NKikimr::NExternalSource::NObjectStorage::NInference
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,22 @@
#include <arrow/table.h>
#include <arrow/csv/options.h>
#include <arrow/csv/reader.h>
#include <parquet/arrow/reader.h>

#include <ydb/core/external_sources/object_storage/events.h>
#include <ydb/library/actors/core/actor_bootstrapped.h>
#include <ydb/library/actors/core/hfunc.h>
#include <ydb/library/actors/core/log.h>
#include <ydb/public/api/protos/ydb_value.pb.h>

#define LOG_E(name, stream) \
LOG_ERROR_S(*NActors::TlsActivationContext, NKikimrServices::OBJECT_STORAGE_INFERENCINATOR, name << ": " << this->SelfId() << ". " << stream)
#define LOG_I(name, stream) \
LOG_INFO_S(*NActors::TlsActivationContext, NKikimrServices::OBJECT_STORAGE_INFERENCINATOR, name << ": " << this->SelfId() << ". " << stream)
#define LOG_D(name, stream) \
LOG_DEBUG_S(*NActors::TlsActivationContext, NKikimrServices::OBJECT_STORAGE_INFERENCINATOR, name << ": " << this->SelfId() << ". " << stream)
#define LOG_T(name, stream) \
LOG_TRACE_S(*NActors::TlsActivationContext, NKikimrServices::OBJECT_STORAGE_INFERENCINATOR, name << ": " << this->SelfId() << ". " << stream)

namespace NKikimr::NExternalSource::NObjectStorage::NInference {

Expand Down Expand Up @@ -202,12 +212,37 @@ std::variant<ArrowFields, TString> InferCsvTypes(std::shared_ptr<arrow::io::Rand
return table->fields();
}

std::variant<ArrowFields, TString> InferParquetTypes(std::shared_ptr<arrow::io::RandomAccessFile> file) {
parquet::arrow::FileReaderBuilder builder;
builder.properties(parquet::ArrowReaderProperties(false));
auto openStatus = builder.Open(std::move(file));
if (!openStatus.ok()) {
return TStringBuilder{} << "couldn't parse parquet file, check format params: " << openStatus.ToString();
}

std::unique_ptr<parquet::arrow::FileReader> reader;
auto readerStatus = builder.Build(&reader);
if (!readerStatus.ok()) {
return TStringBuilder{} << "couldn't parse parquet file, check format params: " << openStatus.ToString();
}

std::shared_ptr<arrow::Schema> schema;
auto schemaRes = reader->GetSchema(&schema);
if (!schemaRes.ok()) {
return TStringBuilder{} << "couldn't parse parquet file, check format params: " << openStatus.ToString();
}

return schema->fields();
}

std::variant<ArrowFields, TString> InferType(EFileFormat format, std::shared_ptr<arrow::io::RandomAccessFile> file, const FormatConfig& config) {
switch (format) {
case EFileFormat::CsvWithNames:
return InferCsvTypes(std::move(file), static_cast<const CsvConfig&>(config));
case EFileFormat::TsvWithNames:
return InferCsvTypes(std::move(file), static_cast<const TsvConfig&>(config));
case EFileFormat::Parquet:
return InferParquetTypes(std::move(file));
case EFileFormat::Undefined:
default:
return std::variant<ArrowFields, TString>{std::in_place_type_t<TString>{}, TStringBuilder{} << "unexpected format: " << ConvertFileFormat(format)};
Expand Down Expand Up @@ -240,7 +275,10 @@ std::unique_ptr<FormatConfig> MakeFormatConfig(EFileFormat format, const THashMa

class TArrowInferencinator : public NActors::TActorBootstrapped<TArrowInferencinator> {
public:
TArrowInferencinator(NActors::TActorId arrowFetcher, EFileFormat format, const THashMap<TString, TString>& params)
TArrowInferencinator(
NActors::TActorId arrowFetcher,
EFileFormat format,
const THashMap<TString, TString>& params)
: Format_{format}
, Config_{MakeFormatConfig(Format_, params)}
, ArrowFetcherId_{arrowFetcher}
Expand Down Expand Up @@ -270,7 +308,6 @@ class TArrowInferencinator : public NActors::TActorBootstrapped<TArrowInferencin
ctx.Send(RequesterId_, MakeErrorSchema(file.Path, NFq::TIssuesIds::INTERNAL_ERROR, std::get<TString>(mbArrowFields)));
return;
}

auto& arrowFields = std::get<ArrowFields>(mbArrowFields);
std::vector<Ydb::Column> ydbFields;
for (const auto& field : arrowFields) {
Expand All @@ -286,7 +323,7 @@ class TArrowInferencinator : public NActors::TActorBootstrapped<TArrowInferencin
}

void HandleFileError(TEvFileError::TPtr& ev, const NActors::TActorContext& ctx) {
Cout << "TArrowInferencinator::HandleFileError" << Endl;
LOG_D("TArrowInferencinator", "HandleFileError: " << ev->Get()->Issues.ToOneLineString());
ctx.Send(RequesterId_, new TEvInferredFileSchema(ev->Get()->Path, std::move(ev->Get()->Issues)));
}

Expand All @@ -297,7 +334,11 @@ class TArrowInferencinator : public NActors::TActorBootstrapped<TArrowInferencin
NActors::TActorId RequesterId_;
};

NActors::IActor* CreateArrowInferencinator(NActors::TActorId arrowFetcher, EFileFormat format, const THashMap<TString, TString>& params) {
NActors::IActor* CreateArrowInferencinator(
NActors::TActorId arrowFetcher,
EFileFormat format,
const THashMap<TString, TString>& params) {

return new TArrowInferencinator{arrowFetcher, format, params};
}
} // namespace NKikimr::NExternalSource::NObjectStorage::NInference
Loading
Loading