diff --git a/ydb/core/external_sources/object_storage.cpp b/ydb/core/external_sources/object_storage.cpp index cdad28cd3429..b4e9ff9f347c 100644 --- a/ydb/core/external_sources/object_storage.cpp +++ b/ydb/core/external_sources/object_storage.cpp @@ -333,7 +333,7 @@ struct TObjectStorageExternalSource : public IExternalSource { } for (const auto& entry : entries.Objects) { if (entry.Size > 0) { - return entry.Path; + return entry; } } throw yexception() << "couldn't find any files for type inference, please check that the right path is provided"; @@ -349,30 +349,31 @@ struct TObjectStorageExternalSource : public IExternalSource { meta->Attributes.erase("withinfer"); auto fileFormat = NObjectStorage::NInference::ConvertFileFormat(*format); - auto arrowFetcherId = ActorSystem->Register(NObjectStorage::NInference::CreateArrowFetchingActor(s3FetcherId, fileFormat)); + auto arrowFetcherId = ActorSystem->Register(NObjectStorage::NInference::CreateArrowFetchingActor(s3FetcherId, fileFormat, meta->Attributes)); auto arrowInferencinatorId = ActorSystem->Register(NObjectStorage::NInference::CreateArrowInferencinator(arrowFetcherId, fileFormat, meta->Attributes)); - return afterListing.Apply([arrowInferencinatorId, meta, actorSystem = ActorSystem](const NThreading::TFuture& pathFut) { + return afterListing.Apply([arrowInferencinatorId, meta, actorSystem = ActorSystem](const NThreading::TFuture& entryFut) { auto promise = NThreading::NewPromise(); auto schemaToMetadata = [meta](NThreading::TPromise metaPromise, NObjectStorage::TEvInferredFileSchema&& response) { if (!response.Status.IsSuccess()) { metaPromise.SetValue(NYql::NCommon::ResultFromError(response.Status.GetIssues())); return; } - TMetadataResult result; meta->Changed = true; meta->Schema.clear_column(); for (const auto& column : response.Fields) { auto& destColumn = *meta->Schema.add_column(); destColumn = column; } + TMetadataResult result; result.SetSuccess(); result.Metadata = meta; metaPromise.SetValue(std::move(result)); }; + auto [path, size, _] = entryFut.GetValue(); actorSystem->Register(new NKqp::TActorRequestHandler( arrowInferencinatorId, - new NObjectStorage::TEvInferFileSchema(TString{pathFut.GetValue()}), + new NObjectStorage::TEvInferFileSchema(TString{path}, size), promise, std::move(schemaToMetadata) )); diff --git a/ydb/core/external_sources/object_storage/events.h b/ydb/core/external_sources/object_storage/events.h index e10dcaced525..2ec09afa36bb 100644 --- a/ydb/core/external_sources/object_storage/events.h +++ b/ydb/core/external_sources/object_storage/events.h @@ -119,11 +119,13 @@ struct TEvArrowFile : public NActors::TEventLocal { }; struct TEvInferFileSchema : public NActors::TEventLocal { - explicit TEvInferFileSchema(TString&& path) + explicit TEvInferFileSchema(TString&& path, ui64 size) : Path{std::move(path)} + , Size{size} {} TString Path; + ui64 Size = 0; }; struct TEvInferredFileSchema : public NActors::TEventLocal { diff --git a/ydb/core/external_sources/object_storage/inference/arrow_fetcher.cpp b/ydb/core/external_sources/object_storage/inference/arrow_fetcher.cpp index 8e96efd739c2..4e49656eba1a 100644 --- a/ydb/core/external_sources/object_storage/inference/arrow_fetcher.cpp +++ b/ydb/core/external_sources/object_storage/inference/arrow_fetcher.cpp @@ -6,6 +6,7 @@ #include #include #include +#include #include #include @@ -13,17 +14,24 @@ #include #include #include +#include +#include namespace NKikimr::NExternalSource::NObjectStorage::NInference { class TArrowFileFetcher : public NActors::TActorBootstrapped { static constexpr uint64_t PrefixSize = 10_MB; public: - TArrowFileFetcher(NActors::TActorId s3FetcherId, EFileFormat format) + TArrowFileFetcher(NActors::TActorId s3FetcherId, EFileFormat format, const THashMap& params) : S3FetcherId_{s3FetcherId} , Format_{format} { Y_ABORT_UNLESS(IsArrowInferredFormat(Format_)); + + auto decompression = params.FindPtr("compression"); + if (decompression) { + DecompressionFormat_ = *decompression; + } } void Bootstrap() { @@ -40,15 +48,20 @@ class TArrowFileFetcher : public NActors::TActorBootstrapped const auto& request = *ev->Get(); TRequest localRequest{ .Path = request.Path, - .RequestId = {}, + .RequestId = TGUID::Create(), .Requester = ev->Sender, + .MetadataRequest = false, }; - CreateGuid(&localRequest.RequestId); switch (Format_) { case EFileFormat::CsvWithNames: case EFileFormat::TsvWithNames: { - HandleAsPrefixFile(std::move(localRequest), ctx); + RequestPartialFile(std::move(localRequest), ctx, 0, 10_MB); + break; + } + case EFileFormat::Parquet: { + localRequest.MetadataRequest = true; + RequestPartialFile(std::move(localRequest), ctx, request.Size - 8, request.Size - 4); break; } default: { @@ -67,6 +80,15 @@ class TArrowFileFetcher : public NActors::TActorBootstrapped const auto& request = requestIt->second; + TString data = std::move(response.Data); + if (DecompressionFormat_) { + auto decompressedData = DecompressFile(data, request, ctx); + if (!decompressedData) { + return; + } + data = std::move(*decompressedData); + } + std::shared_ptr file; switch (Format_) { case EFileFormat::CsvWithNames: @@ -76,7 +98,16 @@ class TArrowFileFetcher : public NActors::TActorBootstrapped if (Format_ == EFileFormat::TsvWithNames) { options.delimiter = '\t'; } - file = CleanupCsvFile(response.Data, request, options, ctx); + file = CleanupCsvFile(data, request, options, ctx); + ctx.Send(request.Requester, new TEvArrowFile(std::move(file), request.Path)); + break; + } + case EFileFormat::Parquet: { + if (request.MetadataRequest) { + HandleMetadataSizeRequest(data, request, ctx); + return; + } + file = BuildParquetFileFromMetadata(data, request, ctx); ctx.Send(request.Requester, new TEvArrowFile(std::move(file), request.Path)); break; } @@ -104,14 +135,15 @@ class TArrowFileFetcher : public NActors::TActorBootstrapped uint64_t From = 0; uint64_t To = 0; NActors::TActorId Requester; + bool MetadataRequest; }; // Reading file - void HandleAsPrefixFile(TRequest&& insertedRequest, const NActors::TActorContext& ctx) { + void RequestPartialFile(TRequest&& insertedRequest, const NActors::TActorContext& ctx, uint64_t from, uint64_t to) { auto path = insertedRequest.Path; - insertedRequest.From = 0; - insertedRequest.To = 10_MB; + insertedRequest.From = from; + insertedRequest.To = to; auto it = InflightRequests_.try_emplace(path, std::move(insertedRequest)); Y_ABORT_UNLESS(it.second, "couldn't insert request for path: %s", insertedRequest.RequestId.AsGuidString().c_str()); @@ -135,6 +167,43 @@ class TArrowFileFetcher : public NActors::TActorBootstrapped // Cutting file + TMaybe DecompressFile(const TString& data, const TRequest& request, const NActors::TActorContext& ctx) { + try { + NDB::ReadBufferFromString dataBuffer(data); + auto decompressorBuffer = NYql::MakeDecompressor(dataBuffer, *DecompressionFormat_); + if (!decompressorBuffer) { + auto error = MakeError( + request.Path, + NFq::TIssuesIds::INTERNAL_ERROR, + TStringBuilder{} << "unknown compression: " << *DecompressionFormat_ << ". Use one of: gzip, zstd, lz4, brotli, bzip2, xz" + ); + SendError(ctx, error); + return {}; + } + + TStringBuilder decompressedData; + while (!decompressorBuffer->eof() && decompressedData.size() < 10_MB) { + decompressorBuffer->nextIfAtEnd(); + size_t maxDecompressedChunkSize = std::min( + decompressorBuffer->available(), + 10_MB - decompressedData.size() + ); + TString decompressedChunk{maxDecompressedChunkSize, ' '}; + decompressorBuffer->read(&decompressedChunk.front(), maxDecompressedChunkSize); + decompressedData << decompressedChunk; + } + return std::move(decompressedData); + } catch (const yexception& error) { + auto errorEv = MakeError( + request.Path, + NFq::TIssuesIds::INTERNAL_ERROR, + TStringBuilder{} << "couldn't decompress file, check compression params: " << error.what() + ); + SendError(ctx, errorEv); + return {}; + } + } + std::shared_ptr CleanupCsvFile(const TString& data, const TRequest& request, const arrow::csv::ParseOptions& options, const NActors::TActorContext& ctx) { auto chunker = arrow::csv::MakeChunker(options); std::shared_ptr whole, partial; @@ -170,6 +239,58 @@ class TArrowFileFetcher : public NActors::TActorBootstrapped return std::make_shared(std::move(whole)); } + void HandleMetadataSizeRequest(const TString& data, TRequest request, const NActors::TActorContext& ctx) { + uint32_t metadataSize = arrow::BitUtil::FromLittleEndian(ReadUnaligned(data.data())); + + if (metadataSize > 10_MB) { + auto error = MakeError( + request.Path, + NFq::TIssuesIds::INTERNAL_ERROR, + TStringBuilder{} << "couldn't load parquet metadata, size is bigger than 10MB : " << metadataSize + ); + SendError(ctx, error); + return; + } + + InflightRequests_.erase(request.Path); + + TRequest localRequest{ + .Path = request.Path, + .RequestId = TGUID::Create(), + .Requester = request.Requester, + .MetadataRequest = false, + }; + RequestPartialFile(std::move(localRequest), ctx, request.From - metadataSize, request.To + 4); + } + + std::shared_ptr BuildParquetFileFromMetadata(const TString& data, const TRequest& request, const NActors::TActorContext& ctx) { + auto arrowData = std::make_shared(nullptr, 0); + arrow::BufferBuilder builder; + auto buildRes = builder.Append(data.data(), data.size()); + if (!buildRes.ok()) { + auto error = MakeError( + request.Path, + NFq::TIssuesIds::INTERNAL_ERROR, + TStringBuilder{} << "couldn't read data from S3Fetcher: " << buildRes.ToString() + ); + SendError(ctx, error); + return nullptr; + } + + buildRes = builder.Finish(&arrowData); + if (!buildRes.ok()) { + auto error = MakeError( + request.Path, + NFq::TIssuesIds::INTERNAL_ERROR, + TStringBuilder{} << "couldn't copy data from S3Fetcher: " << buildRes.ToString() + ); + SendError(ctx, error); + return nullptr; + } + + return std::make_shared(std::move(arrowData)); + } + // Utility void SendError(const NActors::TActorContext& ctx, TEvFileError* error) { auto requestIt = InflightRequests_.find(error->Path); @@ -183,10 +304,11 @@ class TArrowFileFetcher : public NActors::TActorBootstrapped // Fields NActors::TActorId S3FetcherId_; EFileFormat Format_; + TMaybe DecompressionFormat_; std::unordered_map InflightRequests_; // Path -> Request }; -NActors::IActor* CreateArrowFetchingActor(NActors::TActorId s3FetcherId, EFileFormat format) { - return new TArrowFileFetcher{s3FetcherId, format}; +NActors::IActor* CreateArrowFetchingActor(NActors::TActorId s3FetcherId, EFileFormat format, const THashMap& params) { + return new TArrowFileFetcher{s3FetcherId, format, params}; } } // namespace NKikimr::NExternalSource::NObjectStorage::NInference diff --git a/ydb/core/external_sources/object_storage/inference/arrow_fetcher.h b/ydb/core/external_sources/object_storage/inference/arrow_fetcher.h index dc45affccc4f..847f0c53dabd 100644 --- a/ydb/core/external_sources/object_storage/inference/arrow_fetcher.h +++ b/ydb/core/external_sources/object_storage/inference/arrow_fetcher.h @@ -5,5 +5,5 @@ namespace NKikimr::NExternalSource::NObjectStorage::NInference { -NActors::IActor* CreateArrowFetchingActor(NActors::TActorId s3FetcherId, EFileFormat format); +NActors::IActor* CreateArrowFetchingActor(NActors::TActorId s3FetcherId, EFileFormat format, const THashMap& params); } // namespace NKikimr::NExternalSource::NObjectStorage::NInference diff --git a/ydb/core/external_sources/object_storage/inference/arrow_inferencinator.cpp b/ydb/core/external_sources/object_storage/inference/arrow_inferencinator.cpp index 5af6ed584b8d..ffee16b8e89c 100644 --- a/ydb/core/external_sources/object_storage/inference/arrow_inferencinator.cpp +++ b/ydb/core/external_sources/object_storage/inference/arrow_inferencinator.cpp @@ -3,12 +3,22 @@ #include #include #include +#include #include #include #include +#include #include +#define LOG_E(name, stream) \ + LOG_ERROR_S(*NActors::TlsActivationContext, NKikimrServices::OBJECT_STORAGE_INFERENCINATOR, name << ": " << this->SelfId() << ". " << stream) +#define LOG_I(name, stream) \ + LOG_INFO_S(*NActors::TlsActivationContext, NKikimrServices::OBJECT_STORAGE_INFERENCINATOR, name << ": " << this->SelfId() << ". " << stream) +#define LOG_D(name, stream) \ + LOG_DEBUG_S(*NActors::TlsActivationContext, NKikimrServices::OBJECT_STORAGE_INFERENCINATOR, name << ": " << this->SelfId() << ". " << stream) +#define LOG_T(name, stream) \ + LOG_TRACE_S(*NActors::TlsActivationContext, NKikimrServices::OBJECT_STORAGE_INFERENCINATOR, name << ": " << this->SelfId() << ". " << stream) namespace NKikimr::NExternalSource::NObjectStorage::NInference { @@ -202,12 +212,37 @@ std::variant InferCsvTypes(std::shared_ptrfields(); } +std::variant InferParquetTypes(std::shared_ptr file) { + parquet::arrow::FileReaderBuilder builder; + builder.properties(parquet::ArrowReaderProperties(false)); + auto openStatus = builder.Open(std::move(file)); + if (!openStatus.ok()) { + return TStringBuilder{} << "couldn't parse parquet file, check format params: " << openStatus.ToString(); + } + + std::unique_ptr reader; + auto readerStatus = builder.Build(&reader); + if (!readerStatus.ok()) { + return TStringBuilder{} << "couldn't parse parquet file, check format params: " << openStatus.ToString(); + } + + std::shared_ptr schema; + auto schemaRes = reader->GetSchema(&schema); + if (!schemaRes.ok()) { + return TStringBuilder{} << "couldn't parse parquet file, check format params: " << openStatus.ToString(); + } + + return schema->fields(); +} + std::variant InferType(EFileFormat format, std::shared_ptr file, const FormatConfig& config) { switch (format) { case EFileFormat::CsvWithNames: return InferCsvTypes(std::move(file), static_cast(config)); case EFileFormat::TsvWithNames: return InferCsvTypes(std::move(file), static_cast(config)); + case EFileFormat::Parquet: + return InferParquetTypes(std::move(file)); case EFileFormat::Undefined: default: return std::variant{std::in_place_type_t{}, TStringBuilder{} << "unexpected format: " << ConvertFileFormat(format)}; @@ -240,7 +275,10 @@ std::unique_ptr MakeFormatConfig(EFileFormat format, const THashMa class TArrowInferencinator : public NActors::TActorBootstrapped { public: - TArrowInferencinator(NActors::TActorId arrowFetcher, EFileFormat format, const THashMap& params) + TArrowInferencinator( + NActors::TActorId arrowFetcher, + EFileFormat format, + const THashMap& params) : Format_{format} , Config_{MakeFormatConfig(Format_, params)} , ArrowFetcherId_{arrowFetcher} @@ -270,7 +308,6 @@ class TArrowInferencinator : public NActors::TActorBootstrapped(mbArrowFields))); return; } - auto& arrowFields = std::get(mbArrowFields); std::vector ydbFields; for (const auto& field : arrowFields) { @@ -286,7 +323,7 @@ class TArrowInferencinator : public NActors::TActorBootstrappedGet()->Issues.ToOneLineString()); ctx.Send(RequesterId_, new TEvInferredFileSchema(ev->Get()->Path, std::move(ev->Get()->Issues))); } @@ -297,7 +334,11 @@ class TArrowInferencinator : public NActors::TActorBootstrapped& params) { +NActors::IActor* CreateArrowInferencinator( + NActors::TActorId arrowFetcher, + EFileFormat format, + const THashMap& params) { + return new TArrowInferencinator{arrowFetcher, format, params}; } } // namespace NKikimr::NExternalSource::NObjectStorage::NInference diff --git a/ydb/core/external_sources/object_storage/inference/ut/arrow_inference_ut.cpp b/ydb/core/external_sources/object_storage/inference/ut/arrow_inference_ut.cpp index 8dbb49f28347..0208c2ff3ba5 100644 --- a/ydb/core/external_sources/object_storage/inference/ut/arrow_inference_ut.cpp +++ b/ydb/core/external_sources/object_storage/inference/ut/arrow_inference_ut.cpp @@ -50,7 +50,7 @@ class ArrowInferenceTest : public testing::Test { NActors::TActorId RegisterInferencinator(TStringBuf formatStr) { auto format = NInference::ConvertFileFormat(formatStr); - auto arrowFetcher = ActorSystem.Register(NInference::CreateArrowFetchingActor(S3ActorId, format), 1); + auto arrowFetcher = ActorSystem.Register(NInference::CreateArrowFetchingActor(S3ActorId, format, {}), 1); return ActorSystem.Register(NInference::CreateArrowInferencinator(arrowFetcher, format, {}), 1); } @@ -85,7 +85,7 @@ TEST_F(ArrowInferenceTest, csv_simple) { auto inferencinatorId = RegisterInferencinator("csv_with_names"); ActorSystem.WrapInActorContext(EdgeActorId, [this, inferencinatorId] { - NActors::TActivationContext::AsActorContext().Send(inferencinatorId, new TEvInferFileSchema(TString{Path})); + NActors::TActivationContext::AsActorContext().Send(inferencinatorId, new TEvInferFileSchema(TString{Path}, 0)); }); std::unique_ptr event = ActorSystem.WaitForEdgeActorEvent({EdgeActorId}); @@ -121,7 +121,7 @@ TEST_F(ArrowInferenceTest, tsv_simple) { auto inferencinatorId = RegisterInferencinator("tsv_with_names"); ActorSystem.WrapInActorContext(EdgeActorId, [this, inferencinatorId] { - NActors::TActivationContext::AsActorContext().Send(inferencinatorId, new TEvInferFileSchema(TString{Path})); + NActors::TActivationContext::AsActorContext().Send(inferencinatorId, new TEvInferFileSchema(TString{Path}, 0)); }); std::unique_ptr event = ActorSystem.WaitForEdgeActorEvent({EdgeActorId}); diff --git a/ydb/core/external_sources/object_storage/inference/ut/ya.make b/ydb/core/external_sources/object_storage/inference/ut/ya.make index 3bc00815199f..a198a6b6e196 100644 --- a/ydb/core/external_sources/object_storage/inference/ut/ya.make +++ b/ydb/core/external_sources/object_storage/inference/ut/ya.make @@ -1,6 +1,7 @@ GTEST() PEERDIR( + ydb/library/yql/public/udf/service/stub ydb/core/external_sources/object_storage/inference ydb/core/external_sources/object_storage ydb/core/tx/scheme_board diff --git a/ydb/core/external_sources/object_storage/inference/ya.make b/ydb/core/external_sources/object_storage/inference/ya.make index 145f59393228..e65106ca8067 100644 --- a/ydb/core/external_sources/object_storage/inference/ya.make +++ b/ydb/core/external_sources/object_storage/inference/ya.make @@ -1,5 +1,11 @@ LIBRARY() +ADDINCL( + ydb/library/yql/udfs/common/clickhouse/client/base + ydb/library/yql/udfs/common/clickhouse/client/base/pcg-random + ydb/library/yql/udfs/common/clickhouse/client/src +) + SRCS( arrow_fetcher.cpp arrow_inferencinator.cpp @@ -9,6 +15,8 @@ PEERDIR( contrib/libs/apache/arrow ydb/core/external_sources/object_storage + + ydb/library/yql/providers/s3/compressors ) END() diff --git a/ydb/library/services/services.proto b/ydb/library/services/services.proto index 686aac9bbff0..7b430f89b71f 100644 --- a/ydb/library/services/services.proto +++ b/ydb/library/services/services.proto @@ -338,6 +338,7 @@ enum EServiceKikimr { DB_POOL = 1167; YTS = 1168; + OBJECT_STORAGE_INFERENCINATOR = 1169; // 1024 - 1099 is reserved for nbs diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp index 9698745ae77d..52f4fb9909cc 100644 --- a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp +++ b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp @@ -2068,7 +2068,7 @@ std::pair CreateS3ReadActor( const IFunctionRegistry& functionRegistry = *holderFactory.GetFunctionRegistry(); TPathList paths; - ReadPathsList(params, taskParams, readRanges, paths); + ReadPathsList(taskParams, readRanges, paths); const auto token = secureParams.Value(params.GetToken(), TString{}); const TS3Credentials credentials(credentialsFactory, token); diff --git a/ydb/library/yql/providers/s3/proto/source.proto b/ydb/library/yql/providers/s3/proto/source.proto index 33884d655274..97fc6c1fab82 100644 --- a/ydb/library/yql/providers/s3/proto/source.proto +++ b/ydb/library/yql/providers/s3/proto/source.proto @@ -11,7 +11,7 @@ message TPath { message TSource { string Url = 1; string Token = 2; - repeated TPath DeprecatedPath = 3; // deprecated + reserved 3; optional string RowType = 4; optional string Format = 5; map Settings = 6; diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp index 96dd09a14184..845afba4a6a2 100644 --- a/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp +++ b/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp @@ -450,7 +450,7 @@ class TS3DqIntegration: public TDqIntegrationBase { range.Save(&out); paths.clear(); - ReadPathsList(srcDesc, {}, serialized, paths); + ReadPathsList({}, serialized, paths); const NDq::TS3ReadActorFactoryConfig& readActorConfig = State_->Configuration->S3ReadActorFactoryConfig; ui64 fileSizeLimit = readActorConfig.FileSizeLimit; diff --git a/ydb/library/yql/providers/s3/range_helpers/file_tree_builder_ut.cpp b/ydb/library/yql/providers/s3/range_helpers/file_tree_builder_ut.cpp index 07ca574e18d4..9098936e78dd 100644 --- a/ydb/library/yql/providers/s3/range_helpers/file_tree_builder_ut.cpp +++ b/ydb/library/yql/providers/s3/range_helpers/file_tree_builder_ut.cpp @@ -107,7 +107,7 @@ Y_UNIT_TEST_SUITE(S3FileTreeBuilderTest) { b.Save(&range); TPathList paths; - ReadPathsList({}, MakeParams(range), {}, paths); + ReadPathsList(MakeParams(range), {}, paths); UNIT_ASSERT_VALUES_EQUAL(paths.size(), 1); UNIT_ASSERT_VALUES_EQUAL(paths[0].Path, "name"); @@ -124,7 +124,7 @@ Y_UNIT_TEST_SUITE(S3FileTreeBuilderTest) { b.Save(&range); TPathList paths; - ReadPathsList({}, MakeParams(range), {}, paths); + ReadPathsList(MakeParams(range), {}, paths); UNIT_ASSERT_VALUES_EQUAL(paths.size(), 1); UNIT_ASSERT_VALUES_EQUAL(paths[0].Path, "a///b"); @@ -142,7 +142,7 @@ Y_UNIT_TEST_SUITE(S3FileTreeBuilderTest) { b.Save(&range); TPathList paths; - ReadPathsList({}, MakeParams(range), {}, paths); + ReadPathsList(MakeParams(range), {}, paths); UNIT_ASSERT_VALUES_EQUAL(paths.size(), 2); UNIT_ASSERT_VALUES_EQUAL(paths[0].Path, "root/name/"); @@ -165,7 +165,7 @@ Y_UNIT_TEST_SUITE(S3FileTreeBuilderTest) { b.Save(&range); TPathList paths; - ReadPathsList({}, MakeParams(range), {}, paths); + ReadPathsList(MakeParams(range), {}, paths); UNIT_ASSERT_VALUES_EQUAL(paths.size(), 2); @@ -190,7 +190,7 @@ Y_UNIT_TEST_SUITE(S3FileTreeBuilderTest) { b.Save(&range); TPathList paths; - ReadPathsList({}, MakeParams(range), {}, paths); + ReadPathsList(MakeParams(range), {}, paths); UNIT_ASSERT_VALUES_EQUAL(paths.size(), 3); diff --git a/ydb/library/yql/providers/s3/range_helpers/path_list_reader.cpp b/ydb/library/yql/providers/s3/range_helpers/path_list_reader.cpp index 54cf3b3cd963..69b219311b73 100644 --- a/ydb/library/yql/providers/s3/range_helpers/path_list_reader.cpp +++ b/ydb/library/yql/providers/s3/range_helpers/path_list_reader.cpp @@ -38,7 +38,7 @@ static void BuildPathsFromTree(const google::protobuf::RepeatedPtrField map(sourceDesc.GetDeprecatedPath().size()); - for (auto i = 0; i < sourceDesc.GetDeprecatedPath().size(); ++i) { - map.emplace(sourceDesc.GetDeprecatedPath().Get(i).GetPath(), sourceDesc.GetDeprecatedPath().Get(i).GetSize()); - } - - for (auto i = 0; i < range.GetDeprecatedPath().size(); ++i) { - const auto& path = range.GetDeprecatedPath().Get(i); - auto it = map.find(path); - YQL_ENSURE(it != map.end()); - paths.emplace_back(TPath{path, it->second, false, i + startPathIndex}); - } } -void ReadPathsList(const NS3::TSource& sourceDesc, const THashMap& taskParams, const TVector& readRanges, TPathList& paths) { +void ReadPathsList(const THashMap& taskParams, const TVector& readRanges, TPathList& paths) { if (!readRanges.empty()) { for (auto readRange : readRanges) { - DecodeS3Range(sourceDesc, readRange, paths); + DecodeS3Range(readRange, paths); } } else if (const auto taskParamsIt = taskParams.find(S3ProviderName); taskParamsIt != taskParams.cend()) { - DecodeS3Range(sourceDesc, taskParamsIt->second, paths); - } else { - for (auto i = 0; i < sourceDesc.GetDeprecatedPath().size(); ++i) { - paths.emplace_back(TPath{ - sourceDesc.GetDeprecatedPath().Get(i).GetPath(), - sourceDesc.GetDeprecatedPath().Get(i).GetSize(), - false, - static_cast(i)}); - } + DecodeS3Range(taskParamsIt->second, paths); } } diff --git a/ydb/library/yql/providers/s3/range_helpers/path_list_reader.h b/ydb/library/yql/providers/s3/range_helpers/path_list_reader.h index ab490d0cae52..5bcecceafb1d 100644 --- a/ydb/library/yql/providers/s3/range_helpers/path_list_reader.h +++ b/ydb/library/yql/providers/s3/range_helpers/path_list_reader.h @@ -23,7 +23,7 @@ struct TPath { }; using TPathList = std::vector; -void ReadPathsList(const NS3::TSource& sourceDesc, const THashMap& taskParams, const TVector& readRanges, TPathList& paths); +void ReadPathsList(const THashMap& taskParams, const TVector& readRanges, TPathList& paths); void PackPathsList(const TPathList& paths, TString& packed, bool& isTextEncoded); void UnpackPathsList(TStringBuf packed, bool isTextEncoded, TPathList& paths); diff --git a/ydb/library/yql/providers/s3/range_helpers/path_list_reader_ut.cpp b/ydb/library/yql/providers/s3/range_helpers/path_list_reader_ut.cpp index 8e28d06e0aaa..61ea7bf1dcf1 100644 --- a/ydb/library/yql/providers/s3/range_helpers/path_list_reader_ut.cpp +++ b/ydb/library/yql/providers/s3/range_helpers/path_list_reader_ut.cpp @@ -18,63 +18,6 @@ Y_UNIT_TEST_SUITE(PathListReaderTest) { return map; } - Y_UNIT_TEST(ReadsFilesListFromSourceSettings) { - NS3::TSource src; - { - auto* p = src.AddDeprecatedPath(); - p->SetPath("my/path"); - p->SetSize(100500); - } - { - auto* p = src.AddDeprecatedPath(); - p->SetPath("other/path"); - p->SetSize(1); - } - - TPathList paths; - ReadPathsList(src, {}, {}, paths); - - UNIT_ASSERT_VALUES_EQUAL(paths.size(), 2); - - UNIT_ASSERT_VALUES_EQUAL(paths[0].Path, "my/path"); - UNIT_ASSERT_VALUES_EQUAL(paths[0].Size, 100500); - UNIT_ASSERT_VALUES_EQUAL(paths[0].IsDirectory, false); - UNIT_ASSERT_VALUES_EQUAL(paths[0].PathIndex, 0); - - UNIT_ASSERT_VALUES_EQUAL(paths[1].Path, "other/path"); - UNIT_ASSERT_VALUES_EQUAL(paths[1].Size, 1); - UNIT_ASSERT_VALUES_EQUAL(paths[1].IsDirectory, false); - UNIT_ASSERT_VALUES_EQUAL(paths[1].PathIndex, 1); - } - - Y_UNIT_TEST(ReadsFilesListFromParamsAndSourceSettings) { - NS3::TSource src; - { - auto* p = src.AddDeprecatedPath(); - p->SetPath("my/path"); - p->SetSize(100500); - } - { - auto* p = src.AddDeprecatedPath(); - p->SetPath("other/path"); - p->SetSize(1); - } - - NS3::TRange range; - range.SetStartPathIndex(42); - range.AddDeprecatedPath("my/path"); - - TPathList paths; - ReadPathsList(src, MakeParams(range), {}, paths); - - UNIT_ASSERT_VALUES_EQUAL(paths.size(), 1); - - UNIT_ASSERT_VALUES_EQUAL(paths[0].Path, "my/path"); - UNIT_ASSERT_VALUES_EQUAL(paths[0].Size, 100500); - UNIT_ASSERT_VALUES_EQUAL(paths[0].IsDirectory, false); - UNIT_ASSERT_VALUES_EQUAL(paths[0].PathIndex, 42); - } - NYql::NS3::TRange::TPath* SetPath(NYql::NS3::TRange::TPath* path, const TString& name = {}, ui64 size = 0, bool read = false) { path->SetName(name); path->SetSize(size); @@ -84,11 +27,6 @@ Y_UNIT_TEST_SUITE(PathListReaderTest) { Y_UNIT_TEST(ReadsFilesListFromTreeParams) { NS3::TSource src; - { - auto* p = src.AddDeprecatedPath(); - p->SetPath("my/path"); - p->SetSize(100500); - } NS3::TRange range; range.SetStartPathIndex(42); @@ -110,7 +48,7 @@ Y_UNIT_TEST_SUITE(PathListReaderTest) { } TPathList paths; - ReadPathsList(src, MakeParams(range), {}, paths); + ReadPathsList(MakeParams(range), {}, paths); UNIT_ASSERT_VALUES_EQUAL(paths.size(), 5); diff --git a/ydb/tests/fq/s3/test_compression_data/test.csv.br b/ydb/tests/fq/s3/test_compression_data/test.csv.br new file mode 100644 index 000000000000..c348d124102b Binary files /dev/null and b/ydb/tests/fq/s3/test_compression_data/test.csv.br differ diff --git a/ydb/tests/fq/s3/test_compression_data/test.csv.bz2 b/ydb/tests/fq/s3/test_compression_data/test.csv.bz2 new file mode 100644 index 000000000000..b6f7b4df48fa Binary files /dev/null and b/ydb/tests/fq/s3/test_compression_data/test.csv.bz2 differ diff --git a/ydb/tests/fq/s3/test_compression_data/test.csv.gz b/ydb/tests/fq/s3/test_compression_data/test.csv.gz new file mode 100644 index 000000000000..4255d4e01367 Binary files /dev/null and b/ydb/tests/fq/s3/test_compression_data/test.csv.gz differ diff --git a/ydb/tests/fq/s3/test_compression_data/test.csv.lz4 b/ydb/tests/fq/s3/test_compression_data/test.csv.lz4 new file mode 100644 index 000000000000..641c66973f92 Binary files /dev/null and b/ydb/tests/fq/s3/test_compression_data/test.csv.lz4 differ diff --git a/ydb/tests/fq/s3/test_compression_data/test.csv.xz b/ydb/tests/fq/s3/test_compression_data/test.csv.xz new file mode 100644 index 000000000000..efe9ea711f30 Binary files /dev/null and b/ydb/tests/fq/s3/test_compression_data/test.csv.xz differ diff --git a/ydb/tests/fq/s3/test_compression_data/test.csv.zst b/ydb/tests/fq/s3/test_compression_data/test.csv.zst new file mode 100644 index 000000000000..65d58a25a82f Binary files /dev/null and b/ydb/tests/fq/s3/test_compression_data/test.csv.zst differ diff --git a/ydb/tests/fq/s3/test_compressions.py b/ydb/tests/fq/s3/test_compressions.py index 76aa87989a1b..718daad85b67 100644 --- a/ydb/tests/fq/s3/test_compressions.py +++ b/ydb/tests/fq/s3/test_compressions.py @@ -10,7 +10,7 @@ import ydb.public.api.protos.draft.fq_pb2 as fq import ydb.tests.fq.s3.s3_helpers as s3_helpers -from ydb.tests.tools.fq_runner.kikimr_utils import yq_all +from ydb.tests.tools.fq_runner.kikimr_utils import yq_all, yq_v2 class TestS3Compressions: @@ -31,6 +31,26 @@ def validate_result(self, result_set): assert result_set.rows[0].items[0].bytes_value == b"yq" assert result_set.rows[0].items[1].int32_value == 0 assert result_set.rows[0].items[2].bytes_value == b"abc" + + def validate_result_csv(self, result_set): + logging.debug(str(result_set)) + assert len(result_set.columns) == 3 + assert result_set.columns[0].name == "a" + assert result_set.columns[0].type.optional_type.item.type_id == ydb.Type.INT64 + assert result_set.columns[1].name == "b" + assert result_set.columns[1].type.type_id == ydb.Type.UTF8 + assert result_set.columns[2].name == "c" + assert result_set.columns[2].type.optional_type.item.type_id == ydb.Type.DOUBLE + assert len(result_set.rows) == 3 + assert result_set.rows[0].items[0].int64_value == 1 + assert result_set.rows[0].items[1].text_value == "hello" + assert result_set.rows[0].items[2].double_value == 0.5 + assert result_set.rows[1].items[0].int64_value == 2 + assert result_set.rows[1].items[1].text_value == "world" + assert result_set.rows[1].items[2].double_value == 0.25 + assert result_set.rows[2].items[0].int64_value == 3 + assert result_set.rows[2].items[1].text_value == "!" + assert result_set.rows[2].items[2].double_value == 0.125 @yq_all @pytest.mark.parametrize("filename, compression", [ @@ -62,6 +82,35 @@ def test_compression(self, kikimr, s3, client, filename, compression): result_set = data.result.result_set self.validate_result(result_set) + @yq_v2 + @pytest.mark.parametrize( + "filename, compression", + [ + ("test.csv.gz", "gzip"), + ("test.csv.lz4", "lz4"), + ("test.csv.br", "brotli"), + ("test.csv.bz2", "bzip2"), + ("test.csv.zst", "zstd"), + ("test.csv.xz", "xz"), + ], + ) + def test_compression_inference(self, kikimr, s3, client, filename, compression): + self.create_bucket_and_upload_file(filename, s3, kikimr) + client.create_storage_connection("fruitbucket", "fbucket") + + sql = ''' + SELECT * + FROM fruitbucket.`{}` + WITH (format=csv_with_names, compression="{}", with_infer="true"); + '''.format(filename, compression) + + query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id + client.wait_query_status(query_id, fq.QueryMeta.COMPLETED) + + data = client.get_result_data(query_id) + result_set = data.result.result_set + self.validate_result_csv(result_set) + @yq_all @pytest.mark.parametrize("filename, compression", [ ("big.json.gz", "gzip"), @@ -142,3 +191,38 @@ def test_invalid_compression(self, kikimr, s3, client): logging.debug("Describe result: {}".format(describe_result)) describe_string = "{}".format(describe_result) assert "Unknown compression: some_compression. Use one of: gzip, zstd, lz4, brotli, bzip2, xz" in describe_string + + @yq_v2 + @pytest.mark.parametrize("client", [{"folder_id": "my_folder"}], indirect=True) + def test_invalid_compression_inference(self, kikimr, s3, client): + resource = boto3.resource( + "s3", endpoint_url=s3.s3_url, aws_access_key_id="key", aws_secret_access_key="secret_key" + ) + + bucket = resource.Bucket("fbucket") + bucket.create(ACL='public-read') + + s3_client = boto3.client( + "s3", endpoint_url=s3.s3_url, aws_access_key_id="key", aws_secret_access_key="secret_key" + ) + + fruits = R'''[{"name" : "banana", "price" : 3, "weight" : 100}]''' + s3_client.put_object(Body=fruits, Bucket='fbucket', Key='fruits.json', ContentType='text/plain') + kikimr.control_plane.wait_bootstrap(1) + + client.create_storage_connection("fruitbucket", "fbucket") + + sql = fR''' + SELECT * + FROM fruitbucket.`fruits.json` + WITH (format=csv_with_names, compression="gzip", with_infer="true"); + ''' + + query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id + client.wait_query_status(query_id, fq.QueryMeta.FAILED) + describe_result = client.describe_query(query_id).result + logging.debug("Describe result: {}".format(describe_result)) + describe_string = "{}".format(describe_result) + assert ( + "couldn\\'t decompress file, check compression params:" in describe_string + ) diff --git a/ydb/tests/fq/s3/test_formats.py b/ydb/tests/fq/s3/test_formats.py index 608a49e04bce..7ca4caf10ce6 100644 --- a/ydb/tests/fq/s3/test_formats.py +++ b/ydb/tests/fq/s3/test_formats.py @@ -11,7 +11,7 @@ import ydb.public.api.protos.draft.fq_pb2 as fq import ydb.tests.fq.s3.s3_helpers as s3_helpers -from ydb.tests.tools.fq_runner.kikimr_utils import yq_all, yq_stats_full +from ydb.tests.tools.fq_runner.kikimr_utils import yq_all, yq_v2, yq_stats_full class TestS3Formats: @@ -38,6 +38,26 @@ def validate_result(self, result_set): assert result_set.rows[2].items[0].bytes_value == b"Pear" assert result_set.rows[2].items[1].int32_value == 15 assert result_set.rows[2].items[2].int32_value == 33 + + def validate_result_inference(self, result_set): + logging.debug(str(result_set)) + assert len(result_set.columns) == 3 + assert result_set.columns[0].name == "Fruit" + assert result_set.columns[0].type.type_id == ydb.Type.UTF8 + assert result_set.columns[1].name == "Price" + assert result_set.columns[1].type.optional_type.item.type_id == ydb.Type.INT64 + assert result_set.columns[2].name == "Weight" + assert result_set.columns[2].type.optional_type.item.type_id == ydb.Type.INT64 + assert len(result_set.rows) == 3 + assert result_set.rows[0].items[0].text_value == "Banana" + assert result_set.rows[0].items[1].int64_value == 3 + assert result_set.rows[0].items[2].int64_value == 100 + assert result_set.rows[1].items[0].text_value == "Apple" + assert result_set.rows[1].items[1].int64_value == 2 + assert result_set.rows[1].items[2].int64_value == 22 + assert result_set.rows[2].items[0].text_value == "Pear" + assert result_set.rows[2].items[1].int64_value == 15 + assert result_set.rows[2].items[2].int64_value == 33 def validate_pg_result(self, result_set): logging.debug(str(result_set)) @@ -100,6 +120,32 @@ def test_format(self, kikimr, s3, client, filename, type_format, yq_version): if type_format != "json_list": assert stat["ResultSet"]["IngressRows"]["sum"] == 3 + @yq_v2 + @pytest.mark.parametrize( + "filename, type_format", + [ + ("test.csv", "csv_with_names"), + ("test.tsv", "tsv_with_names"), + ("test.parquet", "parquet"), + ], + ) + def test_format_inference(self, kikimr, s3, client, filename, type_format): + self.create_bucket_and_upload_file(filename, s3, kikimr) + client.create_storage_connection("fruitbucket", "fbucket") + + sql = f''' + SELECT * + FROM fruitbucket.`{filename}` + WITH (format=`{type_format}`, with_infer='true'); + ''' + + query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id + client.wait_query_status(query_id, fq.QueryMeta.COMPLETED) + + data = client.get_result_data(query_id) + result_set = data.result.result_set + self.validate_result_inference(result_set) + @yq_all def test_btc(self, kikimr, s3, client, yq_version): self.create_bucket_and_upload_file("btct.parquet", s3, kikimr)