diff --git a/ydb/core/fq/libs/config/protos/row_dispatcher.proto b/ydb/core/fq/libs/config/protos/row_dispatcher.proto index 0607f63dd45c..e4f5d180cb7a 100644 --- a/ydb/core/fq/libs/config/protos/row_dispatcher.proto +++ b/ydb/core/fq/libs/config/protos/row_dispatcher.proto @@ -15,8 +15,9 @@ message TRowDispatcherCoordinatorConfig { } message TJsonParserConfig { - uint64 BatchSizeBytes = 1; + uint64 BatchSizeBytes = 1; // default 1 MiB uint64 BatchCreationTimeoutMs = 2; + uint64 BufferCellCount = 3; // (number rows) * (number columns) limit, default 10^6 } message TRowDispatcherConfig { diff --git a/ydb/core/fq/libs/row_dispatcher/json_filter.cpp b/ydb/core/fq/libs/row_dispatcher/json_filter.cpp index c510094be3a9..2977f6d03bb6 100644 --- a/ydb/core/fq/libs/row_dispatcher/json_filter.cpp +++ b/ydb/core/fq/libs/row_dispatcher/json_filter.cpp @@ -71,6 +71,17 @@ NYT::TNode MakeOutputSchema() { return NYT::TNode::CreateList().Add("StructType").Add(std::move(structMembers)); } +struct TInputType { + const TVector& Offsets; + const TVector*>& Values; + const ui64 RowsOffset; // offset of first value + const ui64 NumberRows; + + ui64 GetOffset(ui64 rowId) const { + return Offsets[rowId + RowsOffset]; + } +}; + class TFilterInputSpec : public NYql::NPureCalc::TInputSpecBase { public: TFilterInputSpec(const NYT::TNode& schema) @@ -85,7 +96,7 @@ class TFilterInputSpec : public NYql::NPureCalc::TInputSpecBase { TVector Schemas; }; -class TFilterInputConsumer : public NYql::NPureCalc::IConsumer&, const TVector&>> { +class TFilterInputConsumer : public NYql::NPureCalc::IConsumer { public: TFilterInputConsumer( const TFilterInputSpec& spec, @@ -123,36 +134,38 @@ class TFilterInputConsumer : public NYql::NPureCalc::IConsumer&, const TVector&> values) override { - Y_ENSURE(FieldsPositions.size() == values.second.size()); + void OnObject(TInputType input) override { + Y_ENSURE(FieldsPositions.size() == input.Values.size()); NKikimr::NMiniKQL::TThrowingBindTerminator bind; with_lock (Worker->GetScopedAlloc()) { + Y_DEFER { + // Clear cache after each object because + // values allocated on another allocator and should be released + Cache.Clear(); + Worker->GetGraph().Invalidate(); + }; + auto& holderFactory = Worker->GetGraph().GetHolderFactory(); // TODO: use blocks here - for (size_t rowId = 0; rowId < values.second.front()->size(); ++rowId) { + for (size_t rowId = 0; rowId < input.NumberRows; ++rowId) { NYql::NUdf::TUnboxedValue* items = nullptr; NYql::NUdf::TUnboxedValue result = Cache.NewArray( holderFactory, - static_cast(values.second.size() + 1), + static_cast(input.Values.size() + 1), items); - items[OffsetPosition] = NYql::NUdf::TUnboxedValuePod(values.first[rowId]); + items[OffsetPosition] = NYql::NUdf::TUnboxedValuePod(input.GetOffset(rowId)); size_t fieldId = 0; - for (const auto& column : values.second) { + for (const auto column : input.Values) { items[FieldsPositions[fieldId++]] = column->at(rowId); } Worker->Push(std::move(result)); } - - // Clear cache after each object because - // values allocated on another allocator and should be released - Cache.Clear(); - Worker->GetGraph().Invalidate(); } } @@ -236,7 +249,7 @@ struct NYql::NPureCalc::TInputSpecTraits { static constexpr bool IsPartial = false; static constexpr bool SupportPushStreamMode = true; - using TConsumerType = THolder&, const TVector&>>>; + using TConsumerType = THolder>; static TConsumerType MakeConsumer( const TFilterInputSpec& spec, @@ -282,9 +295,9 @@ class TJsonFilter::TImpl { LOG_ROW_DISPATCHER_DEBUG("Program created"); } - void Push(const TVector& offsets, const TVector& values) { + void Push(const TVector& offsets, const TVector*>& values, ui64 rowsOffset, ui64 numberRows) { Y_ENSURE(values, "Expected non empty schema"); - InputConsumer->OnObject(std::make_pair(offsets, values)); + InputConsumer->OnObject({.Offsets = offsets, .Values = values, .RowsOffset = rowsOffset, .NumberRows = numberRows}); } TString GetSql() const { @@ -305,7 +318,7 @@ class TJsonFilter::TImpl { private: THolder> Program; - THolder&, const TVector&>>> InputConsumer; + THolder> InputConsumer; const TString Sql; }; @@ -322,8 +335,8 @@ TJsonFilter::TJsonFilter( TJsonFilter::~TJsonFilter() { } -void TJsonFilter::Push(const TVector& offsets, const TVector& values) { - Impl->Push(offsets, values); +void TJsonFilter::Push(const TVector& offsets, const TVector*>& values, ui64 rowsOffset, ui64 numberRows) { + Impl->Push(offsets, values, rowsOffset, numberRows); } TString TJsonFilter::GetSql() { diff --git a/ydb/core/fq/libs/row_dispatcher/json_filter.h b/ydb/core/fq/libs/row_dispatcher/json_filter.h index 6d1cebf9338c..51c0ff1581f7 100644 --- a/ydb/core/fq/libs/row_dispatcher/json_filter.h +++ b/ydb/core/fq/libs/row_dispatcher/json_filter.h @@ -2,8 +2,6 @@ #include "common.h" -#include -#include #include namespace NFq { @@ -23,7 +21,7 @@ class TJsonFilter { ~TJsonFilter(); - void Push(const TVector& offsets, const TVector& values); + void Push(const TVector& offsets, const TVector*>& values, ui64 rowsOffset, ui64 numberRows); TString GetSql(); private: diff --git a/ydb/core/fq/libs/row_dispatcher/json_parser.cpp b/ydb/core/fq/libs/row_dispatcher/json_parser.cpp index 99ee146c68df..41428a939a99 100644 --- a/ydb/core/fq/libs/row_dispatcher/json_parser.cpp +++ b/ydb/core/fq/libs/row_dispatcher/json_parser.cpp @@ -4,9 +4,11 @@ #include #include +#include #include #include #include +#include #include #include @@ -17,6 +19,9 @@ namespace { TString LogPrefix = "JsonParser: "; +constexpr ui64 DEFAULT_BATCH_SIZE = 1_MB; +constexpr ui64 DEFAULT_BUFFER_CELL_COUNT = 1000000; + struct TJsonParserBuffer { size_t NumberValues = 0; bool Finished = false; @@ -36,20 +41,11 @@ struct TJsonParserBuffer { Offsets.reserve(numberValues); } - void AddMessages(const TVector& messages) { + void AddMessage(const NYdb::NTopic::TReadSessionEvent::TDataReceivedEvent::TMessage& message) { Y_ENSURE(!Finished, "Cannot add messages into finished buffer"); - - size_t messagesSize = 0; - for (const auto& message : messages) { - messagesSize += message.GetData().size(); - } - - NumberValues += messages.size(); - Reserve(Values.size() + messagesSize, NumberValues); - for (const auto& message : messages) { - Values << message.GetData(); - Offsets.emplace_back(message.GetOffset()); - } + NumberValues++; + Values << message.GetData(); + Offsets.emplace_back(message.GetOffset()); } std::pair Finish() { @@ -80,16 +76,16 @@ class TColumnParser { const TString TypeYson; const NKikimr::NMiniKQL::TType* TypeMkql; const bool IsOptional = false; - size_t NumberValues = 0; + TVector ParsedRows; public: - TColumnParser(const TString& name, const TString& typeYson, NKikimr::NMiniKQL::TProgramBuilder& programBuilder) + TColumnParser(const TString& name, const TString& typeYson, ui64 maxNumberRows, NKikimr::NMiniKQL::TProgramBuilder& programBuilder) : Name(name) , TypeYson(typeYson) , TypeMkql(NYql::NCommon::ParseTypeFromYson(TStringBuf(typeYson), programBuilder, Cerr)) , IsOptional(TypeMkql->IsOptional()) - , NumberValues(0) { + ParsedRows.reserve(maxNumberRows); try { Parser = CreateParser(TypeMkql); } catch (...) { @@ -97,14 +93,14 @@ class TColumnParser { } } - void ParseJsonValue(simdjson::builtin::ondemand::value jsonValue, NYql::NUdf::TUnboxedValue& resultValue) { + void ParseJsonValue(ui64 rowId, simdjson::builtin::ondemand::value jsonValue, NYql::NUdf::TUnboxedValue& resultValue) { + ParsedRows.emplace_back(rowId); Parser(jsonValue, resultValue); - NumberValues++; } void ValidateNumberValues(size_t expectedNumberValues, ui64 firstOffset) const { - if (Y_UNLIKELY(!IsOptional && NumberValues < expectedNumberValues)) { - throw yexception() << "Failed to parse json messages, found " << expectedNumberValues - NumberValues << " missing values from offset " << firstOffset << " in non optional column '" << Name << "' with type " << TypeYson; + if (Y_UNLIKELY(!IsOptional && ParsedRows.size() < expectedNumberValues)) { + throw yexception() << "Failed to parse json messages, found " << expectedNumberValues - ParsedRows.size() << " missing values from offset " << firstOffset << " in non optional column '" << Name << "' with type " << TypeYson; } } @@ -273,11 +269,13 @@ namespace NFq { class TJsonParser::TImpl { public: - TImpl(const TVector& columns, const TVector& types, ui64 batchSize, TDuration batchCreationTimeout) + TImpl(const TVector& columns, const TVector& types, TCallback parseCallback, ui64 batchSize, TDuration batchCreationTimeout, ui64 bufferCellCount) : Alloc(__LOCATION__, NKikimr::TAlignedPagePoolCounters(), true, false) , TypeEnv(std::make_unique(Alloc)) - , BatchSize(batchSize) + , BatchSize(batchSize ? batchSize : DEFAULT_BATCH_SIZE) + , MaxNumberRows(((bufferCellCount ? bufferCellCount : DEFAULT_BUFFER_CELL_COUNT) - 1) / columns.size() + 1) , BatchCreationTimeout(batchCreationTimeout) + , ParseCallback(parseCallback) , ParsedValues(columns.size()) { Y_ENSURE(columns.size() == types.size(), "Number of columns and types should by equal"); @@ -288,7 +286,7 @@ class TJsonParser::TImpl { Columns.reserve(columns.size()); for (size_t i = 0; i < columns.size(); i++) { - Columns.emplace_back(columns[i], types[i], programBuilder); + Columns.emplace_back(columns[i], types[i], MaxNumberRows, programBuilder); } } @@ -297,7 +295,11 @@ class TJsonParser::TImpl { ColumnsIndex.emplace(std::string_view(Columns[i].Name), i); } - Buffer.Reserve(BatchSize, 1); + for (size_t i = 0; i < columns.size(); i++) { + ParsedValues[i].resize(MaxNumberRows); + } + + Buffer.Reserve(BatchSize, MaxNumberRows); LOG_ROW_DISPATCHER_INFO("Simdjson active implementation " << simdjson::get_active_implementation()->name()); Parser.threaded = false; @@ -320,31 +322,34 @@ class TJsonParser::TImpl { } void AddMessages(const TVector& messages) { - if (messages.empty()) { - return; - } - - if (Buffer.Finished) { - Buffer.Clear(); + Y_ENSURE(!Buffer.Finished, "Cannot add messages into finished buffer"); + for (const auto& message : messages) { + Buffer.AddMessage(message); + if (Buffer.IsReady() && Buffer.GetSize() >= BatchSize) { + Parse(); + } } - Buffer.AddMessages(messages); } - const TVector& Parse() { + void Parse() { Y_ENSURE(Buffer.IsReady(), "Nothing to parse"); const auto [values, size] = Buffer.Finish(); LOG_ROW_DISPATCHER_TRACE("Parse values:\n" << values); with_lock (Alloc) { - ClearColumns(Buffer.NumberValues); + Y_DEFER { + // Clear all UV in case of exception + ClearColumns(); + Buffer.Clear(); + }; - const ui64 firstOffset = Buffer.Offsets.front(); size_t rowId = 0; + size_t parsedRows = 0; simdjson::ondemand::document_stream documents = Parser.iterate_many(values, size, simdjson::ondemand::DEFAULT_BATCH_SIZE); for (auto document : documents) { - if (Y_UNLIKELY(rowId >= Buffer.NumberValues)) { - throw yexception() << "Failed to parse json messages, expected " << Buffer.NumberValues << " json rows from offset " << firstOffset << " but got " << rowId + 1; + if (Y_UNLIKELY(parsedRows >= Buffer.NumberValues)) { + throw yexception() << "Failed to parse json messages, expected " << Buffer.NumberValues << " json rows from offset " << Buffer.Offsets.front() << " but got " << parsedRows + 1; } for (auto item : document.get_object()) { const auto it = ColumnsIndex.find(item.escaped_key().value()); @@ -355,23 +360,27 @@ class TJsonParser::TImpl { const size_t columnId = it->second; auto& columnParser = Columns[columnId]; try { - columnParser.ParseJsonValue(item.value(), ParsedValues[columnId][rowId]); + columnParser.ParseJsonValue(rowId, item.value(), ParsedValues[columnId][rowId]); } catch (...) { throw yexception() << "Failed to parse json string at offset " << Buffer.Offsets[rowId] << ", got parsing error for column '" << columnParser.Name << "' with type " << columnParser.TypeYson << ", description: " << CurrentExceptionMessage(); } } + rowId++; + parsedRows++; + if (rowId == MaxNumberRows) { + FlushColumns(parsedRows, MaxNumberRows); + rowId = 0; + } } - if (rowId != Buffer.NumberValues) { - throw yexception() << "Failed to parse json messages, expected " << Buffer.NumberValues << " json rows from offset " << firstOffset << " but got " << rowId; + if (Y_UNLIKELY(parsedRows != Buffer.NumberValues)) { + throw yexception() << "Failed to parse json messages, expected " << Buffer.NumberValues << " json rows from offset " << Buffer.Offsets.front() << " but got " << rowId; } - for (const auto& columnDesc : Columns) { - columnDesc.ValidateNumberValues(rowId, firstOffset); + if (rowId) { + FlushColumns(parsedRows, rowId); } } - - return ParsedValues; } TString GetDescription() const { @@ -385,7 +394,6 @@ class TJsonParser::TImpl { ~TImpl() { with_lock (Alloc) { - ClearColumns(0); ParsedValues.clear(); Columns.clear(); TypeEnv.reset(); @@ -393,18 +401,29 @@ class TJsonParser::TImpl { } private: - void ClearColumns(size_t newSize) { - const auto clearValue = [&allocState = Alloc.Ref()](NYql::NUdf::TUnboxedValue& value){ - value.UnlockRef(1); - value.Clear(); - }; + void FlushColumns(size_t parsedRows, size_t savedRows) { + const ui64 firstOffset = Buffer.Offsets.front(); + for (const auto& column : Columns) { + column.ValidateNumberValues(savedRows, firstOffset); + } - for (size_t i = 0; i < Columns.size(); ++i) { - Columns[i].NumberValues = 0; + { + auto unguard = Unguard(Alloc); + ParseCallback(parsedRows - savedRows, savedRows, ParsedValues); + } + ClearColumns(); + } + + void ClearColumns() { + for (size_t i = 0; i < Columns.size(); ++i) { auto& parsedColumn = ParsedValues[i]; - std::for_each(parsedColumn.begin(), parsedColumn.end(), clearValue); - parsedColumn.resize(newSize); + for (size_t rowId : Columns[i].ParsedRows) { + auto& parsedRow = parsedColumn[rowId]; + parsedRow.UnlockRef(1); + parsedRow.Clear(); + } + Columns[i].ParsedRows.clear(); } } @@ -413,18 +432,20 @@ class TJsonParser::TImpl { std::unique_ptr TypeEnv; const ui64 BatchSize; + const ui64 MaxNumberRows; const TDuration BatchCreationTimeout; + const TCallback ParseCallback; TVector Columns; absl::flat_hash_map ColumnsIndex; TJsonParserBuffer Buffer; simdjson::ondemand::parser Parser; - TVector ParsedValues; + TVector> ParsedValues; }; -TJsonParser::TJsonParser(const TVector& columns, const TVector& types, ui64 batchSize, TDuration batchCreationTimeout) - : Impl(std::make_unique(columns, types, batchSize, batchCreationTimeout)) +TJsonParser::TJsonParser(const TVector& columns, const TVector& types, TCallback parseCallback, ui64 batchSize, TDuration batchCreationTimeout, ui64 bufferCellCount) + : Impl(std::make_unique(columns, types, parseCallback, batchSize, batchCreationTimeout, bufferCellCount)) {} TJsonParser::~TJsonParser() { @@ -450,16 +471,16 @@ const TVector& TJsonParser::GetOffsets() const { return Impl->GetOffsets(); } -const TVector& TJsonParser::Parse() { - return Impl->Parse(); +void TJsonParser::Parse() { + Impl->Parse(); } TString TJsonParser::GetDescription() const { return Impl->GetDescription(); } -std::unique_ptr NewJsonParser(const TVector& columns, const TVector& types, ui64 batchSize, TDuration batchCreationTimeout) { - return std::unique_ptr(new TJsonParser(columns, types, batchSize, batchCreationTimeout)); +std::unique_ptr NewJsonParser(const TVector& columns, const TVector& types, TJsonParser::TCallback parseCallback, ui64 batchSize, TDuration batchCreationTimeout, ui64 bufferCellCount) { + return std::unique_ptr(new TJsonParser(columns, types, parseCallback, batchSize, batchCreationTimeout, bufferCellCount)); } } // namespace NFq diff --git a/ydb/core/fq/libs/row_dispatcher/json_parser.h b/ydb/core/fq/libs/row_dispatcher/json_parser.h index 0b5b74de8642..77c6cccf9f6e 100644 --- a/ydb/core/fq/libs/row_dispatcher/json_parser.h +++ b/ydb/core/fq/libs/row_dispatcher/json_parser.h @@ -1,14 +1,17 @@ #pragma once -#include - #include +#include + namespace NFq { class TJsonParser { public: - TJsonParser(const TVector& columns, const TVector& types, ui64 batchSize, TDuration batchCreationTimeout); + using TCallback = std::function>& parsedValues)>; + +public: + TJsonParser(const TVector& columns, const TVector& types, TCallback parseCallback, ui64 batchSize, TDuration batchCreationTimeout, ui64 bufferCellCount); ~TJsonParser(); bool IsReady() const; @@ -17,7 +20,7 @@ class TJsonParser { const TVector& GetOffsets() const; void AddMessages(const TVector& messages); - const TVector& Parse(); + void Parse(); TString GetDescription() const; @@ -26,6 +29,6 @@ class TJsonParser { const std::unique_ptr Impl; }; -std::unique_ptr NewJsonParser(const TVector& columns, const TVector& types, ui64 batchSize, TDuration batchCreationTimeout); +std::unique_ptr NewJsonParser(const TVector& columns, const TVector& types, TJsonParser::TCallback parseCallback, ui64 batchSize, TDuration batchCreationTimeout, ui64 bufferCellCount); } // namespace NFq diff --git a/ydb/core/fq/libs/row_dispatcher/topic_session.cpp b/ydb/core/fq/libs/row_dispatcher/topic_session.cpp index 841ed7cf2d31..32a2d36871b8 100644 --- a/ydb/core/fq/libs/row_dispatcher/topic_session.cpp +++ b/ydb/core/fq/libs/row_dispatcher/topic_session.cpp @@ -208,7 +208,7 @@ class TTopicSession : public TActorBootstrapped { void SubscribeOnNextEvent(); void SendToParsing(const TVector& messages); void DoParsing(bool force = false); - void DoFiltering(const TVector& offsets, const TVector& parsedValues); + void DoFiltering(ui64 rowsOffset, ui64 numberRows, const TVector>& parsedValues); void SendData(TClientsInfo& info); void UpdateParser(); void FatalError(const TString& message, const std::unique_ptr* filter, bool addParserDescription); @@ -235,7 +235,7 @@ class TTopicSession : public TActorBootstrapped { void SendStatistic(); void SendSessionError(NActors::TActorId readActorId, const TString& message); - TVector RebuildJson(const TClientsInfo& info, const TVector& parsedValues); + TVector*> RebuildJson(const TClientsInfo& info, const TVector>& parsedValues); void UpdateParserSchema(const TParserInputType& inputType); void UpdateFieldsIds(TClientsInfo& clientInfo); bool CheckNewClient(NFq::TEvRowDispatcher::TEvStartSession::TPtr& ev); @@ -422,8 +422,8 @@ void TTopicSession::Handle(NFq::TEvPrivate::TEvCreateSession::TPtr&) { CreateTopicSession(); } -TVector TTopicSession::RebuildJson(const TClientsInfo& info, const TVector& parsedValues) { - TVector result; +TVector*> TTopicSession::RebuildJson(const TClientsInfo& info, const TVector>& parsedValues) { + TVector*> result; const auto& offsets = ParserSchema.FieldsMap; result.reserve(info.FieldsIds.size()); for (auto fieldId : info.FieldsIds) { @@ -623,21 +623,23 @@ void TTopicSession::DoParsing(bool force) { LOG_ROW_DISPATCHER_TRACE("SendToParsing, first offset: " << Parser->GetOffsets().front() << ", number values in buffer " << Parser->GetOffsets().size()); try { - const auto& parsedValues = Parser->Parse(); - DoFiltering(Parser->GetOffsets(), parsedValues); + Parser->Parse(); } catch (const std::exception& e) { FatalError(e.what(), nullptr, true); } } -void TTopicSession::DoFiltering(const TVector& offsets, const TVector& parsedValues) { +void TTopicSession::DoFiltering(ui64 rowsOffset, ui64 numberRows, const TVector>& parsedValues) { + const auto& offsets = Parser->GetOffsets(); + Y_ENSURE(rowsOffset < offsets.size(), "Invalid first row ofset"); + Y_ENSURE(numberRows, "Expected non empty parsed batch"); Y_ENSURE(parsedValues, "Expected non empty schema"); - LOG_ROW_DISPATCHER_TRACE("SendToFiltering, first offset: " << offsets.front() << ", last offset: " << offsets.back()); + LOG_ROW_DISPATCHER_TRACE("SendToFiltering, first offset: " << offsets[rowsOffset] << ", last offset: " << offsets[rowsOffset + numberRows - 1]); for (auto& [actorId, info] : Clients) { try { if (info.Filter) { - info.Filter->Push(offsets, RebuildJson(info, parsedValues)); + info.Filter->Push(offsets, RebuildJson(info, parsedValues), rowsOffset, numberRows); } } catch (const std::exception& e) { FatalError(e.what(), &info.Filter, false); @@ -866,7 +868,9 @@ void TTopicSession::UpdateParser() { LOG_ROW_DISPATCHER_TRACE("Init JsonParser with columns: " << JoinSeq(',', names)); const auto& parserConfig = Config.GetJsonParser(); - Parser = NewJsonParser(names, types, parserConfig.GetBatchSizeBytes(), TDuration::MilliSeconds(parserConfig.GetBatchCreationTimeoutMs())); + Parser = NewJsonParser(names, types, [this](ui64 rowsOffset, ui64 numberRows, const TVector>& parsedValues) { + DoFiltering(rowsOffset, numberRows, parsedValues); + }, parserConfig.GetBatchSizeBytes(), TDuration::MilliSeconds(parserConfig.GetBatchCreationTimeoutMs()), parserConfig.GetBufferCellCount()); } catch (const NYql::NPureCalc::TCompileError& e) { FatalError(e.GetIssues(), nullptr, true); } diff --git a/ydb/core/fq/libs/row_dispatcher/ut/json_filter_ut.cpp b/ydb/core/fq/libs/row_dispatcher/ut/json_filter_ut.cpp index 8f975e004463..b360d159d5ca 100644 --- a/ydb/core/fq/libs/row_dispatcher/ut/json_filter_ut.cpp +++ b/ydb/core/fq/libs/row_dispatcher/ut/json_filter_ut.cpp @@ -26,7 +26,9 @@ class TFixture : public NUnitTest::TBaseFixture { : PureCalcProgramFactory(CreatePureCalcProgramFactory()) , Runtime(true) , Alloc(__LOCATION__, NKikimr::TAlignedPagePoolCounters(), true, false) - {} + { + Alloc.Ref().UseRefLocking = true; + } static void SegmentationFaultHandler(int) { Cerr << "segmentation fault call stack:" << Endl; @@ -69,7 +71,11 @@ class TFixture : public NUnitTest::TBaseFixture { {.EnabledLLVM = false}); } - const NKikimr::NMiniKQL::TUnboxedValueVector* MakeVector(size_t size, std::function valueCreator) { + void Push(const TVector& offsets, const TVector*>& values) { + Filter->Push(offsets, values, 0, values.front()->size()); + } + + const TVector* MakeVector(size_t size, std::function valueCreator) { with_lock (Alloc) { Holders.emplace_front(); for (size_t i = 0; i < size; ++i) { @@ -81,21 +87,21 @@ class TFixture : public NUnitTest::TBaseFixture { } template - const NKikimr::NMiniKQL::TUnboxedValueVector* MakeVector(const TVector& values, bool optional = false) { + const TVector* MakeVector(const TVector& values, bool optional = false) { return MakeVector(values.size(), [&](size_t i) { NYql::NUdf::TUnboxedValuePod unboxedValue = NYql::NUdf::TUnboxedValuePod(values[i]); return optional ? unboxedValue.MakeOptional() : unboxedValue; }); } - const NKikimr::NMiniKQL::TUnboxedValueVector* MakeStringVector(const TVector& values, bool optional = false) { + const TVector* MakeStringVector(const TVector& values, bool optional = false) { return MakeVector(values.size(), [&](size_t i) { NYql::NUdf::TUnboxedValuePod stringValue = NKikimr::NMiniKQL::MakeString(values[i]); return optional ? stringValue.MakeOptional() : stringValue; }); } - const NKikimr::NMiniKQL::TUnboxedValueVector* MakeEmptyVector(size_t size) { + const TVector* MakeEmptyVector(size_t size) { return MakeVector(size, [&](size_t) { return NYql::NUdf::TUnboxedValuePod(); }); @@ -107,7 +113,7 @@ class TFixture : public NUnitTest::TBaseFixture { std::unique_ptr Filter; NKikimr::NMiniKQL::TScopedAlloc Alloc; - TList Holders; + TList> Holders; }; Y_UNIT_TEST_SUITE(TJsonFilterTests) { @@ -120,8 +126,8 @@ Y_UNIT_TEST_SUITE(TJsonFilterTests) { [&](ui64 offset, const TString& json) { result[offset] = json; }); - Filter->Push({5}, {MakeStringVector({"hello1"}), MakeVector({99}), MakeStringVector({"zapuskaem"}, true)}); - Filter->Push({6}, {MakeStringVector({"hello2"}), MakeVector({101}), MakeStringVector({"gusya"}, true)}); + Push({5}, {MakeStringVector({"hello1"}), MakeVector({99}), MakeStringVector({"zapuskaem"}, true)}); + Push({6}, {MakeStringVector({"hello2"}), MakeVector({101}), MakeStringVector({"gusya"}, true)}); UNIT_ASSERT_VALUES_EQUAL(1, result.size()); UNIT_ASSERT_VALUES_EQUAL(R"({"a1":"hello2","a2":101,"a@3":"gusya"})", result[6]); } @@ -135,8 +141,8 @@ Y_UNIT_TEST_SUITE(TJsonFilterTests) { [&](ui64 offset, const TString& json) { result[offset] = json; }); - Filter->Push({5}, {MakeVector({99}), MakeStringVector({"hello1"})}); - Filter->Push({6}, {MakeVector({101}), MakeStringVector({"hello2"})}); + Push({5}, {MakeVector({99}), MakeStringVector({"hello1"})}); + Push({6}, {MakeVector({101}), MakeStringVector({"hello2"})}); UNIT_ASSERT_VALUES_EQUAL(1, result.size()); UNIT_ASSERT_VALUES_EQUAL(R"({"a1":"hello2","a2":101})", result[6]); } @@ -152,7 +158,7 @@ Y_UNIT_TEST_SUITE(TJsonFilterTests) { }); const TString largeString = "abcdefghjkl1234567890+abcdefghjkl1234567890"; for (ui64 i = 0; i < 5; ++i) { - Filter->Push({2 * i, 2 * i + 1}, {MakeStringVector({"hello1", "hello2"}), MakeVector({99, 101}), MakeStringVector({largeString, largeString})}); + Push({2 * i, 2 * i + 1}, {MakeStringVector({"hello1", "hello2"}), MakeVector({99, 101}), MakeStringVector({largeString, largeString})}); UNIT_ASSERT_VALUES_EQUAL_C(i + 1, result.size(), i); UNIT_ASSERT_VALUES_EQUAL_C(TStringBuilder() << "{\"a1\":\"hello2\",\"a2\":101,\"a3\":\"" << largeString << "\"}", result[2 * i + 1], i); } @@ -167,10 +173,24 @@ Y_UNIT_TEST_SUITE(TJsonFilterTests) { [&](ui64 offset, const TString& json) { result[offset] = json; }); - Filter->Push({5}, {MakeEmptyVector(1), MakeStringVector({"str"})}); + Push({5}, {MakeEmptyVector(1), MakeStringVector({"str"})}); UNIT_ASSERT_VALUES_EQUAL(1, result.size()); UNIT_ASSERT_VALUES_EQUAL(R"({"a1":null,"a2":"str"})", result[5]); } + + Y_UNIT_TEST_F(PartialPush, TFixture) { + TMap result; + MakeFilter( + {"a1", "a2", "a@3"}, + {"[DataType; String]", "[DataType; Uint64]", "[OptionalType; [DataType; String]]"}, + "where a2 > 50", + [&](ui64 offset, const TString& json) { + result[offset] = json; + }); + Filter->Push({5, 6, 7}, {MakeStringVector({"hello1", "hello2"}), MakeVector({99, 101}), MakeStringVector({"zapuskaem", "gusya"}, true)}, 1, 1); + UNIT_ASSERT_VALUES_EQUAL(1, result.size()); + UNIT_ASSERT_VALUES_EQUAL(R"({"a1":"hello1","a2":99,"a@3":"zapuskaem"})", result[6]); + } } } diff --git a/ydb/core/fq/libs/row_dispatcher/ut/json_parser_ut.cpp b/ydb/core/fq/libs/row_dispatcher/ut/json_parser_ut.cpp index a3859130962f..c01ce5ae7601 100644 --- a/ydb/core/fq/libs/row_dispatcher/ut/json_parser_ut.cpp +++ b/ydb/core/fq/libs/row_dispatcher/ut/json_parser_ut.cpp @@ -46,20 +46,25 @@ class TFixture : public NUnitTest::TBaseFixture { } } + void MakeParser(TVector columns, TVector types, TJsonParser::TCallback callback, ui64 batchSize = 1_MB, ui64 bufferCellCount = 1000) { + Parser = NFq::NewJsonParser(columns, types, callback, batchSize, TDuration::Hours(1), bufferCellCount); + } + + void MakeParser(TVector columns, TJsonParser::TCallback callback) { + MakeParser(columns, TVector(columns.size(), "[DataType; String]"), callback); + } + void MakeParser(TVector columns, TVector types) { - Parser = NFq::NewJsonParser(columns, types, 0, TDuration::Zero()); + MakeParser(columns, types, [](ui64, ui64, const TVector>&) {}); } void MakeParser(TVector columns) { MakeParser(columns, TVector(columns.size(), "[DataType; String]")); } - const TVector& PushToParser(ui64 offset, const TString& data) { + void PushToParser(ui64 offset, const TString& data) { Parser->AddMessages({GetMessage(offset, data)}); - - const auto& parsedValues = Parser->Parse(); - ResultNumberValues = parsedValues ? parsedValues.front().size() : 0; - return parsedValues; + Parser->Parse(); } static NYdb::NTopic::TReadSessionEvent::TDataReceivedEvent::TMessage GetMessage(ui64 offset, const TString& data) { @@ -70,118 +75,137 @@ class TFixture : public NUnitTest::TBaseFixture { TActorSystemStub actorSystemStub; NActors::TTestActorRuntime Runtime; std::unique_ptr Parser; - ui64 ResultNumberValues = 0; }; Y_UNIT_TEST_SUITE(TJsonParserTests) { Y_UNIT_TEST_F(Simple1, TFixture) { - MakeParser({"a1", "a2"}, {"[DataType; String]", "[OptionalType; [DataType; Uint64]]"}); - const auto& result = PushToParser(42,R"({"a1": "hello1", "a2": 101, "event": "event1"})"); - UNIT_ASSERT_VALUES_EQUAL(1, ResultNumberValues); - - UNIT_ASSERT_VALUES_EQUAL(2, result.size()); - UNIT_ASSERT_VALUES_EQUAL("hello1", TString(result[0][0].AsStringRef())); - UNIT_ASSERT_VALUES_EQUAL(101, result[1][0].GetOptionalValue().Get()); + MakeParser({"a1", "a2"}, {"[DataType; String]", "[OptionalType; [DataType; Uint64]]"}, [](ui64 rowsOffset, ui64 numberRows, const TVector>& result) { + UNIT_ASSERT_VALUES_EQUAL(0, rowsOffset); + UNIT_ASSERT_VALUES_EQUAL(1, numberRows); + UNIT_ASSERT_VALUES_EQUAL(2, result.size()); + UNIT_ASSERT_VALUES_EQUAL("hello1", TString(result[0][0].AsStringRef())); + UNIT_ASSERT_VALUES_EQUAL(101, result[1][0].GetOptionalValue().Get()); + }); + PushToParser(42,R"({"a1": "hello1", "a2": 101, "event": "event1"})"); } Y_UNIT_TEST_F(Simple2, TFixture) { - MakeParser({"a2", "a1"}); - const auto& result = PushToParser(42,R"({"a1": "hello1", "a2": "101", "event": "event1"})"); - UNIT_ASSERT_VALUES_EQUAL(1, ResultNumberValues); - - UNIT_ASSERT_VALUES_EQUAL(2, result.size()); - UNIT_ASSERT_VALUES_EQUAL("101", TString(result[0][0].AsStringRef())); - UNIT_ASSERT_VALUES_EQUAL("hello1", TString(result[1][0].AsStringRef())); + MakeParser({"a2", "a1"}, [](ui64 rowsOffset, ui64 numberRows, const TVector>& result) { + UNIT_ASSERT_VALUES_EQUAL(0, rowsOffset); + UNIT_ASSERT_VALUES_EQUAL(1, numberRows); + UNIT_ASSERT_VALUES_EQUAL(2, result.size()); + UNIT_ASSERT_VALUES_EQUAL("101", TString(result[0][0].AsStringRef())); + UNIT_ASSERT_VALUES_EQUAL("hello1", TString(result[1][0].AsStringRef())); + }); + PushToParser(42,R"({"a1": "hello1", "a2": "101", "event": "event1"})"); } Y_UNIT_TEST_F(Simple3, TFixture) { - MakeParser({"a1", "a2"}); - const auto& result = PushToParser(42,R"({"a2": "hello1", "a1": "101", "event": "event1"})"); - UNIT_ASSERT_VALUES_EQUAL(1, ResultNumberValues); - - UNIT_ASSERT_VALUES_EQUAL(2, result.size()); - UNIT_ASSERT_VALUES_EQUAL("101", TString(result[0][0].AsStringRef())); - UNIT_ASSERT_VALUES_EQUAL("hello1", TString(result[1][0].AsStringRef())); + MakeParser({"a1", "a2"}, [](ui64 rowsOffset, ui64 numberRows, const TVector>& result) { + UNIT_ASSERT_VALUES_EQUAL(0, rowsOffset); + UNIT_ASSERT_VALUES_EQUAL(1, numberRows); + UNIT_ASSERT_VALUES_EQUAL(2, result.size()); + UNIT_ASSERT_VALUES_EQUAL("101", TString(result[0][0].AsStringRef())); + UNIT_ASSERT_VALUES_EQUAL("hello1", TString(result[1][0].AsStringRef())); + }); + PushToParser(42,R"({"a2": "hello1", "a1": "101", "event": "event1"})"); } Y_UNIT_TEST_F(Simple4, TFixture) { - MakeParser({"a2", "a1"}); - const auto& result = PushToParser(42, R"({"a2": "hello1", "a1": "101", "event": "event1"})"); - UNIT_ASSERT_VALUES_EQUAL(1, ResultNumberValues); - - UNIT_ASSERT_VALUES_EQUAL(2, result.size()); - UNIT_ASSERT_VALUES_EQUAL("hello1", TString(result[0][0].AsStringRef())); - UNIT_ASSERT_VALUES_EQUAL("101", TString(result[1][0].AsStringRef())); + MakeParser({"a2", "a1"}, [](ui64 rowsOffset, ui64 numberRows, const TVector>& result) { + UNIT_ASSERT_VALUES_EQUAL(0, rowsOffset); + UNIT_ASSERT_VALUES_EQUAL(1, numberRows); + UNIT_ASSERT_VALUES_EQUAL(2, result.size()); + UNIT_ASSERT_VALUES_EQUAL("hello1", TString(result[0][0].AsStringRef())); + UNIT_ASSERT_VALUES_EQUAL("101", TString(result[1][0].AsStringRef())); + }); + PushToParser(42, R"({"a2": "hello1", "a1": "101", "event": "event1"})"); } Y_UNIT_TEST_F(LargeStrings, TFixture) { - MakeParser({"col"}); - const TString largeString = "abcdefghjkl1234567890+abcdefghjkl1234567890"; + + MakeParser({"col"}, [&](ui64 rowsOffset, ui64 numberRows, const TVector>& result) { + UNIT_ASSERT_VALUES_EQUAL(0, rowsOffset); + UNIT_ASSERT_VALUES_EQUAL(2, numberRows); + UNIT_ASSERT_VALUES_EQUAL(1, result.size()); + UNIT_ASSERT_VALUES_EQUAL(largeString, TString(result[0][0].AsStringRef())); + UNIT_ASSERT_VALUES_EQUAL(largeString, TString(result[0][1].AsStringRef())); + }); + const TString jsonString = TStringBuilder() << "{\"col\": \"" << largeString << "\"}"; Parser->AddMessages({ GetMessage(42, jsonString), GetMessage(43, jsonString) }); - - const auto& result = Parser->Parse(); - ResultNumberValues = result.front().size(); - UNIT_ASSERT_VALUES_EQUAL(2, ResultNumberValues); - - UNIT_ASSERT_VALUES_EQUAL(1, result.size()); - UNIT_ASSERT_VALUES_EQUAL(largeString, TString(result[0][0].AsStringRef())); - UNIT_ASSERT_VALUES_EQUAL(largeString, TString(result[0][1].AsStringRef())); + Parser->Parse(); } Y_UNIT_TEST_F(ManyValues, TFixture) { - MakeParser({"a1", "a2"}); + MakeParser({"a1", "a2"}, [&](ui64 rowsOffset, ui64 numberRows, const TVector>& result) { + UNIT_ASSERT_VALUES_EQUAL(0, rowsOffset); + UNIT_ASSERT_VALUES_EQUAL(3, numberRows); + UNIT_ASSERT_VALUES_EQUAL(2, result.size()); + for (size_t i = 0; i < numberRows; ++i) { + UNIT_ASSERT_VALUES_EQUAL_C("hello1", TString(result[0][i].AsStringRef()), i); + UNIT_ASSERT_VALUES_EQUAL_C("101", TString(result[1][i].AsStringRef()), i); + } + }); Parser->AddMessages({ GetMessage(42, R"({"a1": "hello1", "a2": "101", "event": "event1"})"), GetMessage(43, R"({"a1": "hello1", "a2": "101", "event": "event2"})"), GetMessage(44, R"({"a2": "101", "a1": "hello1", "event": "event3"})") }); - - const auto& result = Parser->Parse(); - ResultNumberValues = result.front().size(); - UNIT_ASSERT_VALUES_EQUAL(3, ResultNumberValues); - UNIT_ASSERT_VALUES_EQUAL(2, result.size()); - for (size_t i = 0; i < ResultNumberValues; ++i) { - UNIT_ASSERT_VALUES_EQUAL_C("hello1", TString(result[0][i].AsStringRef()), i); - UNIT_ASSERT_VALUES_EQUAL_C("101", TString(result[1][i].AsStringRef()), i); - } + Parser->Parse(); } Y_UNIT_TEST_F(MissingFields, TFixture) { - MakeParser({"a1", "a2"}, {"[OptionalType; [DataType; String]]", "[OptionalType; [DataType; Uint64]]"}); + MakeParser({"a1", "a2"}, {"[OptionalType; [DataType; String]]", "[OptionalType; [DataType; Uint64]]"}, [&](ui64 rowsOffset, ui64 numberRows, const TVector>& result) { + UNIT_ASSERT_VALUES_EQUAL(0, rowsOffset); + UNIT_ASSERT_VALUES_EQUAL(3, numberRows); + UNIT_ASSERT_VALUES_EQUAL(2, result.size()); + for (size_t i = 0; i < numberRows; ++i) { + if (i == 2) { + UNIT_ASSERT_C(!result[0][i], i); + } else { + NYql::NUdf::TUnboxedValue value = result[0][i].GetOptionalValue(); + UNIT_ASSERT_VALUES_EQUAL_C("hello1", TString(value.AsStringRef()), i); + } + if (i == 1) { + UNIT_ASSERT_C(!result[1][i], i); + } else { + UNIT_ASSERT_VALUES_EQUAL_C(101, result[1][i].GetOptionalValue().Get(), i); + } + } + }); Parser->AddMessages({ GetMessage(42, R"({"a1": "hello1", "a2": 101 , "event": "event1"})"), GetMessage(43, R"({"a1": "hello1", "event": "event2"})"), GetMessage(44, R"({"a2": "101", "a1": null, "event": "event3"})") }); - - const auto& result = Parser->Parse(); - ResultNumberValues = result.front().size(); - UNIT_ASSERT_VALUES_EQUAL(3, ResultNumberValues); - UNIT_ASSERT_VALUES_EQUAL(2, result.size()); - for (size_t i = 0; i < ResultNumberValues; ++i) { - if (i == 2) { - UNIT_ASSERT_C(!result[0][i], i); - } else { - NYql::NUdf::TUnboxedValue value = result[0][i].GetOptionalValue(); - UNIT_ASSERT_VALUES_EQUAL_C("hello1", TString(value.AsStringRef()), i); - } - if (i == 1) { - UNIT_ASSERT_C(!result[1][i], i); - } else { - UNIT_ASSERT_VALUES_EQUAL_C(101, result[1][i].GetOptionalValue().Get(), i); - } - } + Parser->Parse(); } Y_UNIT_TEST_F(NestedTypes, TFixture) { - MakeParser({"nested", "a1"}, {"[OptionalType; [DataType; Json]]", "[DataType; String]"}); + MakeParser({"nested", "a1"}, {"[OptionalType; [DataType; Json]]", "[DataType; String]"}, [&](ui64 rowsOffset, ui64 numberRows, const TVector>& result) { + UNIT_ASSERT_VALUES_EQUAL(0, rowsOffset); + UNIT_ASSERT_VALUES_EQUAL(4, numberRows); + + UNIT_ASSERT_VALUES_EQUAL(2, result.size()); + UNIT_ASSERT_VALUES_EQUAL("{\"key\": \"value\"}", TString(result[0][0].AsStringRef())); + UNIT_ASSERT_VALUES_EQUAL("hello1", TString(result[1][0].AsStringRef())); + + UNIT_ASSERT_VALUES_EQUAL("[\"key1\", \"key2\"]", TString(result[0][1].AsStringRef())); + UNIT_ASSERT_VALUES_EQUAL("hello2", TString(result[1][1].AsStringRef())); + + UNIT_ASSERT_VALUES_EQUAL("\"some string\"", TString(result[0][2].AsStringRef())); + UNIT_ASSERT_VALUES_EQUAL("hello3", TString(result[1][2].AsStringRef())); + + UNIT_ASSERT_VALUES_EQUAL("123456", TString(result[0][3].AsStringRef())); + UNIT_ASSERT_VALUES_EQUAL("hello4", TString(result[1][3].AsStringRef())); + }); Parser->AddMessages({ GetMessage(42, R"({"a1": "hello1", "nested": {"key": "value"}})"), @@ -189,39 +213,68 @@ Y_UNIT_TEST_SUITE(TJsonParserTests) { GetMessage(43, R"({"a1": "hello3", "nested": "some string"})"), GetMessage(43, R"({"a1": "hello4", "nested": 123456})") }); + Parser->Parse(); + } - const auto& result = Parser->Parse(); - ResultNumberValues = result.front().size(); - UNIT_ASSERT_VALUES_EQUAL(4, ResultNumberValues); + Y_UNIT_TEST_F(SimpleBooleans, TFixture) { + MakeParser({"a"}, {"[DataType; Bool]"}, [&](ui64 rowsOffset, ui64 numberRows, const TVector>& result) { + UNIT_ASSERT_VALUES_EQUAL(0, rowsOffset); + UNIT_ASSERT_VALUES_EQUAL(2, numberRows); - UNIT_ASSERT_VALUES_EQUAL(2, result.size()); - UNIT_ASSERT_VALUES_EQUAL("{\"key\": \"value\"}", TString(result[0][0].AsStringRef())); - UNIT_ASSERT_VALUES_EQUAL("hello1", TString(result[1][0].AsStringRef())); + UNIT_ASSERT_VALUES_EQUAL(1, result.size()); + UNIT_ASSERT_VALUES_EQUAL(true, result[0][0].Get()); + UNIT_ASSERT_VALUES_EQUAL(false, result[0][1].Get()); + }); - UNIT_ASSERT_VALUES_EQUAL("[\"key1\", \"key2\"]", TString(result[0][1].AsStringRef())); - UNIT_ASSERT_VALUES_EQUAL("hello2", TString(result[1][1].AsStringRef())); + Parser->AddMessages({ + GetMessage(42, R"({"a": true})"), + GetMessage(43, R"({"a": false})") + }); + Parser->Parse(); + } - UNIT_ASSERT_VALUES_EQUAL("\"some string\"", TString(result[0][2].AsStringRef())); - UNIT_ASSERT_VALUES_EQUAL("hello3", TString(result[1][2].AsStringRef())); + Y_UNIT_TEST_F(ManyBatches, TFixture) { + const TString largeString = "abcdefghjkl1234567890+abcdefghjkl1234567890"; - UNIT_ASSERT_VALUES_EQUAL("123456", TString(result[0][3].AsStringRef())); - UNIT_ASSERT_VALUES_EQUAL("hello4", TString(result[1][3].AsStringRef())); - } + ui64 currentOffset = 0; + MakeParser({"col"}, {"[DataType; String]"}, [&](ui64 rowsOffset, ui64 numberRows, const TVector>& result) { + UNIT_ASSERT_VALUES_EQUAL(currentOffset, rowsOffset); + currentOffset++; - Y_UNIT_TEST_F(SimpleBooleans, TFixture) { - MakeParser({"a"}, {"[DataType; Bool]"}); + UNIT_ASSERT_VALUES_EQUAL(1, numberRows); + UNIT_ASSERT_VALUES_EQUAL(1, result.size()); + UNIT_ASSERT_VALUES_EQUAL(largeString, TString(result[0][0].AsStringRef())); + }, 1_MB, 1); + + const TString jsonString = TStringBuilder() << "{\"col\": \"" << largeString << "\"}"; Parser->AddMessages({ - GetMessage(42, R"({"a": true})"), - GetMessage(43, R"({"a": false})") + GetMessage(42, jsonString), + GetMessage(43, jsonString) }); + Parser->Parse(); + } - const auto& result = Parser->Parse(); - ResultNumberValues = result.front().size(); - UNIT_ASSERT_VALUES_EQUAL(2, ResultNumberValues); + Y_UNIT_TEST_F(LittleBatches, TFixture) { + const TString largeString = "abcdefghjkl1234567890+abcdefghjkl1234567890"; - UNIT_ASSERT_VALUES_EQUAL(1, result.size()); - UNIT_ASSERT_VALUES_EQUAL(true, result[0][0].Get()); - UNIT_ASSERT_VALUES_EQUAL(false, result[0][1].Get()); + ui64 currentOffset = 42; + MakeParser({"col"}, {"[DataType; String]"}, [&](ui64 rowsOffset, ui64 numberRows, const TVector>& result) { + UNIT_ASSERT_VALUES_EQUAL(Parser->GetOffsets().size(), 1); + UNIT_ASSERT_VALUES_EQUAL(Parser->GetOffsets().front(), currentOffset); + currentOffset++; + + UNIT_ASSERT_VALUES_EQUAL(0, rowsOffset); + UNIT_ASSERT_VALUES_EQUAL(1, numberRows); + UNIT_ASSERT_VALUES_EQUAL(1, result.size()); + UNIT_ASSERT_VALUES_EQUAL(largeString, TString(result[0][0].AsStringRef())); + }, 10); + + const TString jsonString = TStringBuilder() << "{\"col\": \"" << largeString << "\"}"; + Parser->AddMessages({ + GetMessage(42, jsonString), + GetMessage(43, jsonString) + }); + UNIT_ASSERT_VALUES_EQUAL(Parser->GetNumberValues(), 0); } Y_UNIT_TEST_F(MissingFieldsValidation, TFixture) {