Skip to content

Commit

Permalink
YQ-3869 RD added limit for parser buffer size (#11627)
Browse files Browse the repository at this point in the history
  • Loading branch information
GrigoriyPA authored Nov 18, 2024
1 parent 3e28059 commit a9441cb
Show file tree
Hide file tree
Showing 8 changed files with 320 additions and 207 deletions.
3 changes: 2 additions & 1 deletion ydb/core/fq/libs/config/protos/row_dispatcher.proto
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,9 @@ message TRowDispatcherCoordinatorConfig {
}

message TJsonParserConfig {
uint64 BatchSizeBytes = 1;
uint64 BatchSizeBytes = 1; // default 1 MiB
uint64 BatchCreationTimeoutMs = 2;
uint64 BufferCellCount = 3; // (number rows) * (number columns) limit, default 10^6
}

message TRowDispatcherConfig {
Expand Down
49 changes: 31 additions & 18 deletions ydb/core/fq/libs/row_dispatcher/json_filter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,17 @@ NYT::TNode MakeOutputSchema() {
return NYT::TNode::CreateList().Add("StructType").Add(std::move(structMembers));
}

struct TInputType {
const TVector<ui64>& Offsets;
const TVector<const TVector<NYql::NUdf::TUnboxedValue>*>& Values;
const ui64 RowsOffset; // offset of first value
const ui64 NumberRows;

ui64 GetOffset(ui64 rowId) const {
return Offsets[rowId + RowsOffset];
}
};

class TFilterInputSpec : public NYql::NPureCalc::TInputSpecBase {
public:
TFilterInputSpec(const NYT::TNode& schema)
Expand All @@ -85,7 +96,7 @@ class TFilterInputSpec : public NYql::NPureCalc::TInputSpecBase {
TVector<NYT::TNode> Schemas;
};

class TFilterInputConsumer : public NYql::NPureCalc::IConsumer<std::pair<const TVector<ui64>&, const TVector<const NKikimr::NMiniKQL::TUnboxedValueVector*>&>> {
class TFilterInputConsumer : public NYql::NPureCalc::IConsumer<TInputType> {
public:
TFilterInputConsumer(
const TFilterInputSpec& spec,
Expand Down Expand Up @@ -123,36 +134,38 @@ class TFilterInputConsumer : public NYql::NPureCalc::IConsumer<std::pair<const T
}
}

void OnObject(std::pair<const TVector<ui64>&, const TVector<const NKikimr::NMiniKQL::TUnboxedValueVector*>&> values) override {
Y_ENSURE(FieldsPositions.size() == values.second.size());
void OnObject(TInputType input) override {
Y_ENSURE(FieldsPositions.size() == input.Values.size());

NKikimr::NMiniKQL::TThrowingBindTerminator bind;
with_lock (Worker->GetScopedAlloc()) {
Y_DEFER {
// Clear cache after each object because
// values allocated on another allocator and should be released
Cache.Clear();
Worker->GetGraph().Invalidate();
};

auto& holderFactory = Worker->GetGraph().GetHolderFactory();

// TODO: use blocks here
for (size_t rowId = 0; rowId < values.second.front()->size(); ++rowId) {
for (size_t rowId = 0; rowId < input.NumberRows; ++rowId) {
NYql::NUdf::TUnboxedValue* items = nullptr;

NYql::NUdf::TUnboxedValue result = Cache.NewArray(
holderFactory,
static_cast<ui32>(values.second.size() + 1),
static_cast<ui32>(input.Values.size() + 1),
items);

items[OffsetPosition] = NYql::NUdf::TUnboxedValuePod(values.first[rowId]);
items[OffsetPosition] = NYql::NUdf::TUnboxedValuePod(input.GetOffset(rowId));

size_t fieldId = 0;
for (const auto& column : values.second) {
for (const auto column : input.Values) {
items[FieldsPositions[fieldId++]] = column->at(rowId);
}

Worker->Push(std::move(result));
}

// Clear cache after each object because
// values allocated on another allocator and should be released
Cache.Clear();
Worker->GetGraph().Invalidate();
}
}

Expand Down Expand Up @@ -236,7 +249,7 @@ struct NYql::NPureCalc::TInputSpecTraits<TFilterInputSpec> {
static constexpr bool IsPartial = false;
static constexpr bool SupportPushStreamMode = true;

using TConsumerType = THolder<NYql::NPureCalc::IConsumer<std::pair<const TVector<ui64>&, const TVector<const NKikimr::NMiniKQL::TUnboxedValueVector*>&>>>;
using TConsumerType = THolder<NYql::NPureCalc::IConsumer<TInputType>>;

static TConsumerType MakeConsumer(
const TFilterInputSpec& spec,
Expand Down Expand Up @@ -282,9 +295,9 @@ class TJsonFilter::TImpl {
LOG_ROW_DISPATCHER_DEBUG("Program created");
}

void Push(const TVector<ui64>& offsets, const TVector<const NKikimr::NMiniKQL::TUnboxedValueVector*>& values) {
void Push(const TVector<ui64>& offsets, const TVector<const TVector<NYql::NUdf::TUnboxedValue>*>& values, ui64 rowsOffset, ui64 numberRows) {
Y_ENSURE(values, "Expected non empty schema");
InputConsumer->OnObject(std::make_pair(offsets, values));
InputConsumer->OnObject({.Offsets = offsets, .Values = values, .RowsOffset = rowsOffset, .NumberRows = numberRows});
}

TString GetSql() const {
Expand All @@ -305,7 +318,7 @@ class TJsonFilter::TImpl {

private:
THolder<NYql::NPureCalc::TPushStreamProgram<TFilterInputSpec, TFilterOutputSpec>> Program;
THolder<NYql::NPureCalc::IConsumer<std::pair<const TVector<ui64>&, const TVector<const NKikimr::NMiniKQL::TUnboxedValueVector*>&>>> InputConsumer;
THolder<NYql::NPureCalc::IConsumer<TInputType>> InputConsumer;
const TString Sql;
};

Expand All @@ -322,8 +335,8 @@ TJsonFilter::TJsonFilter(
TJsonFilter::~TJsonFilter() {
}

void TJsonFilter::Push(const TVector<ui64>& offsets, const TVector<const NKikimr::NMiniKQL::TUnboxedValueVector*>& values) {
Impl->Push(offsets, values);
void TJsonFilter::Push(const TVector<ui64>& offsets, const TVector<const TVector<NYql::NUdf::TUnboxedValue>*>& values, ui64 rowsOffset, ui64 numberRows) {
Impl->Push(offsets, values, rowsOffset, numberRows);
}

TString TJsonFilter::GetSql() {
Expand Down
4 changes: 1 addition & 3 deletions ydb/core/fq/libs/row_dispatcher/json_filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@

#include "common.h"

#include <yql/essentials/minikql/computation/mkql_computation_node_holders.h>
#include <yql/essentials/public/udf/udf_data_type.h>
#include <yql/essentials/public/udf/udf_value.h>

namespace NFq {
Expand All @@ -23,7 +21,7 @@ class TJsonFilter {

~TJsonFilter();

void Push(const TVector<ui64>& offsets, const TVector<const NKikimr::NMiniKQL::TUnboxedValueVector*>& values);
void Push(const TVector<ui64>& offsets, const TVector<const TVector<NYql::NUdf::TUnboxedValue>*>& values, ui64 rowsOffset, ui64 numberRows);
TString GetSql();

private:
Expand Down
Loading

0 comments on commit a9441cb

Please sign in to comment.