Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce TBlockJoinState #8368

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
340 changes: 146 additions & 194 deletions ydb/library/yql/minikql/comp_nodes/mkql_block_map_join.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,154 @@ size_t CalcMaxBlockLength(const TVector<TType*>& items) {
}));
}

template <bool RightRequired>
class TBlockJoinState : public TBlockState {
public:
TBlockJoinState(TMemoryUsageInfo* memInfo, TComputationContext& ctx,
const TVector<TType*>& inputItems,
const TVector<TType*> outputItems,
NUdf::TUnboxedValue**const fields)
: TBlockState(memInfo, outputItems.size())
, InputWidth_(inputItems.size() - 1)
, OutputWidth_(outputItems.size() - 1)
, Inputs_(inputItems.size())
, InputsDescr_(ToValueDescr(inputItems))
{
const auto& pgBuilder = ctx.Builder->GetPgBuilder();
MaxLength_ = CalcMaxBlockLength(outputItems);
for (size_t i = 0; i < inputItems.size(); i++) {
fields[i] = &Inputs_[i];
const TType* blockItemType = AS_TYPE(TBlockType, inputItems[i])->GetItemType();
Readers_.push_back(MakeBlockReader(TTypeInfoHelper(), blockItemType));
Converters_.push_back(MakeBlockItemConverter(TTypeInfoHelper(), blockItemType, pgBuilder));
}
// The last output column (i.e. block length) doesn't require a block builder.
for (size_t i = 0; i < OutputWidth_; i++) {
const TType* blockItemType = AS_TYPE(TBlockType, outputItems[i])->GetItemType();
Builders_.push_back(MakeArrayBuilder(TTypeInfoHelper(), blockItemType, ctx.ArrowMemoryPool, MaxLength_, &pgBuilder, &BuilderAllocatedSize_));
}
MaxBuilderAllocatedSize_ = MaxAllocatedFactor_ * BuilderAllocatedSize_;
}

void CopyRow() {
// Copy items from the "left" flow.
for (size_t i = 0; i < InputWidth_; i++) {
AddItem(GetItem(i), i);
}
OutputRows_++;
}

void MakeRow(const NUdf::TUnboxedValuePod& value) {
// Copy items from the "left" flow.
for (size_t i = 0; i < InputWidth_; i++) {
AddItem(GetItem(i), i);
}
// Convert and append items from the "right" dict.
if constexpr (RightRequired) {
for (size_t i = InputWidth_, j = 0; i < OutputWidth_; i++, j++) {
AddValue(value.GetElement(j), i);
}
} else {
if (value) {
for (size_t i = InputWidth_, j = 0; i < OutputWidth_; i++, j++) {
AddValue(value.GetElement(j), i);
}
} else {
for (size_t i = InputWidth_; i < OutputWidth_; i++) {
AddValue(value, i);
}
}
}
OutputRows_++;
}

void MakeBlocks(const THolderFactory& holderFactory) {
Values.back() = holderFactory.CreateArrowBlock(arrow::Datum(std::make_shared<arrow::UInt64Scalar>(OutputRows_)));
OutputRows_ = 0;
BuilderAllocatedSize_ = 0;

for (size_t i = 0; i < Builders_.size(); i++) {
Values[i] = holderFactory.CreateArrowBlock(Builders_[i]->Build(IsFinished_));
}
FillArrays();
}

TBlockItem GetItem(size_t idx) const {
const auto& datum = TArrowBlock::From(Inputs_[idx]).GetDatum();
ARROW_DEBUG_CHECK_DATUM_TYPES(InputsDescr_[idx], datum.descr());
if (datum.is_scalar()) {
return Readers_[idx]->GetScalarItem(*datum.scalar());
}
MKQL_ENSURE(datum.is_array(), "Expecting array");
return Readers_[idx]->GetItem(*datum.array(), Current_);
}

NUdf::TUnboxedValuePod GetValue(const THolderFactory& holderFactory, size_t idx) const {
return Converters_[idx]->MakeValue(GetItem(idx), holderFactory);
}

void Reset() {
Next_ = 0;
InputRows_ = GetBlockCount(Inputs_.back());
}

void Finish() {
IsFinished_ = true;
}

bool NextRow() {
if (Next_ >= InputRows_) {
return false;
}
Current_ = Next_++;
return true;
}

bool IsNotFull() const {
return OutputRows_ < MaxLength_
&& BuilderAllocatedSize_ <= MaxBuilderAllocatedSize_;
}

bool IsEmpty() const {
return OutputRows_ == 0;
}

bool IsFinished() const {
return IsFinished_;
}

private:
void AddItem(const TBlockItem& item, size_t idx) {
Builders_[idx]->Add(item);
}

void AddValue(const NUdf::TUnboxedValuePod& value, size_t idx) {
Builders_[idx]->Add(value);
}

size_t Current_ = 0;
size_t Next_ = 0;
bool IsFinished_ = false;
size_t MaxLength_;
size_t BuilderAllocatedSize_ = 0;
size_t MaxBuilderAllocatedSize_ = 0;
static const size_t MaxAllocatedFactor_ = 4;
size_t InputRows_ = 0;
size_t OutputRows_ = 0;
size_t InputWidth_;
size_t OutputWidth_;
TUnboxedValueVector Inputs_;
const std::vector<arrow::ValueDescr> InputsDescr_;
TVector<std::unique_ptr<IBlockReader>> Readers_;
TVector<std::unique_ptr<IBlockItemConverter>> Converters_;
TVector<std::unique_ptr<IArrayBuilder>> Builders_;
};

template <bool WithoutRight, bool RightRequired>
class TBlockWideMapJoinWrapper : public TStatefulWideFlowComputationNode<TBlockWideMapJoinWrapper<WithoutRight, RightRequired>>
{
using TBaseComputation = TStatefulWideFlowComputationNode<TBlockWideMapJoinWrapper<WithoutRight, RightRequired>>;
using TState = TBlockJoinState<RightRequired>;
public:
TBlockWideMapJoinWrapper(TComputationMutables& mutables,
const TVector<TType*>&& resultJoinItems, const TVector<TType*>&& leftFlowItems,
Expand All @@ -36,7 +180,7 @@ using TBaseComputation = TStatefulWideFlowComputationNode<TBlockWideMapJoinWrapp
, LeftKeyColumns_(std::move(leftKeyColumns))
, Flow_(flow)
, Dict_(dict)
, WideFieldsIndex_(mutables.IncrementWideFieldsIndex(ResultJoinItems_.size()))
, WideFieldsIndex_(mutables.IncrementWideFieldsIndex(LeftFlowItems_.size()))
{}

EFetchResult DoCalculate(NUdf::TUnboxedValue& state, TComputationContext& ctx, NUdf::TUnboxedValue*const* output) const {
Expand Down Expand Up @@ -79,7 +223,7 @@ using TBaseComputation = TStatefulWideFlowComputationNode<TBlockWideMapJoinWrapp
if (s.IsEmpty()) {
return EFetchResult::Finish;
}
s.MakeBlocks();
s.MakeBlocks(ctx.HolderFactory);
const auto sliceSize = s.Slice();

for (size_t i = 0; i < ResultJoinItems_.size(); i++) {
Expand All @@ -98,198 +242,6 @@ using TBaseComputation = TStatefulWideFlowComputationNode<TBlockWideMapJoinWrapp
}
}

class TState : public TComputationValue<TState> {
using TBase = TComputationValue<TState>;
size_t Current_ = 0;
size_t Next_ = 0;
bool IsFinished_ = false;
size_t MaxLength_;
size_t BuilderAllocatedSize_ = 0;
size_t MaxBuilderAllocatedSize_ = 0;
static const size_t MaxAllocatedFactor_ = 4;
size_t InputRows_ = 0;
size_t OutputRows_ = 0;
size_t InputWidth_;
size_t OutputWidth_;
TUnboxedValueVector Inputs_;
const std::vector<arrow::ValueDescr> InputsDescr_;
TVector<std::deque<std::shared_ptr<arrow::ArrayData>>> Deques;
TVector<std::shared_ptr<arrow::ArrayData>> Arrays;
TVector<std::unique_ptr<IBlockReader>> Readers_;
TVector<std::unique_ptr<IBlockItemConverter>> Converters_;
TVector<std::unique_ptr<IArrayBuilder>> Builders_;

public:
TState(TMemoryUsageInfo* memInfo, TComputationContext& ctx,
const TVector<TType*>& inputItems, const TVector<TType*> outputItems,
NUdf::TUnboxedValue**const fields)
: TBase(memInfo)
, InputWidth_(inputItems.size() - 1)
, OutputWidth_(outputItems.size() - 1)
, Inputs_(inputItems.size())
, InputsDescr_(ToValueDescr(inputItems))
, Deques(OutputWidth_)
, Arrays(OutputWidth_)
{
const auto& pgBuilder = ctx.Builder->GetPgBuilder();
MaxLength_ = CalcMaxBlockLength(outputItems);
for (size_t i = 0; i < inputItems.size(); i++) {
fields[i] = &Inputs_[i];
const TType* blockItemType = AS_TYPE(TBlockType, inputItems[i])->GetItemType();
Readers_.push_back(MakeBlockReader(TTypeInfoHelper(), blockItemType));
Converters_.push_back(MakeBlockItemConverter(TTypeInfoHelper(), blockItemType, pgBuilder));
}
// The last output column (i.e. block length) doesn't require a block builder.
for (size_t i = 0; i < OutputWidth_; i++) {
const TType* blockItemType = AS_TYPE(TBlockType, outputItems[i])->GetItemType();
Builders_.push_back(MakeArrayBuilder(TTypeInfoHelper(), blockItemType, ctx.ArrowMemoryPool, MaxLength_, &pgBuilder, &BuilderAllocatedSize_));
}
MaxBuilderAllocatedSize_ = MaxAllocatedFactor_ * BuilderAllocatedSize_;
}

void Reset() {
Next_ = 0;
InputRows_ = GetBlockCount(Inputs_.back());
}

void Finish() {
IsFinished_ = true;
}

bool NextRow() {
if (Next_ >= InputRows_) {
return false;
}
Current_ = Next_++;
return true;
}

bool IsNotFull() {
return OutputRows_ < MaxLength_
&& BuilderAllocatedSize_ <= MaxBuilderAllocatedSize_;
}

bool IsEmpty() {
return OutputRows_ == 0;
}

bool IsFinished() {
return IsFinished_;
}

TBlockItem GetItem(size_t idx) const {
const auto& datum = TArrowBlock::From(Inputs_[idx]).GetDatum();
ARROW_DEBUG_CHECK_DATUM_TYPES(InputsDescr_[idx], datum.descr());
if (datum.is_scalar()) {
return Readers_[idx]->GetScalarItem(*datum.scalar());
}
MKQL_ENSURE(datum.is_array(), "Expecting array");
return Readers_[idx]->GetItem(*datum.array(), Current_);
}

NUdf::TUnboxedValuePod GetValue(const THolderFactory& holderFactory, size_t idx) const {
return Converters_[idx]->MakeValue(GetItem(idx), holderFactory);
}

void AddValue(const NUdf::TUnboxedValuePod& value, size_t idx) {
Builders_[idx]->Add(value);
}

void AddItem(const TBlockItem& item, size_t idx) {
Builders_[idx]->Add(item);
}

void CopyRow() {
// Copy items from the "left" flow.
for (size_t i = 0; i < InputWidth_; i++) {
AddItem(GetItem(i), i);
}
OutputRows_++;
}

void MakeRow(const NUdf::TUnboxedValuePod& value) {
// Copy items from the "left" flow.
for (size_t i = 0; i < InputWidth_; i++) {
AddItem(GetItem(i), i);
}
// Convert and append items from the "right" dict.
if (value) {
for (size_t i = InputWidth_, j = 0; i < OutputWidth_; i++, j++) {
AddValue(value.GetElement(j), i);
}
} else {
for (size_t i = InputWidth_; i < OutputWidth_; i++) {
AddValue(value, i);
}
}
OutputRows_++;
}

void CopyArray(size_t idx, size_t popCount, const ui8* sparseBitmap, size_t bitmapSize) {
const auto& datum = TArrowBlock::From(Inputs_[idx]).GetDatum();
Y_ENSURE(datum.is_array());
Builders_[idx]->AddMany(*datum.array(), popCount, sparseBitmap, bitmapSize);
}

void MakeBlocks() {
if (OutputRows_ == 0) {
return;
}
BuilderAllocatedSize_ = 0;

for (size_t i = 0; i < Builders_.size(); i++) {
const auto& datum = Builders_[i]->Build(IsFinished_);
Deques[i].clear();
MKQL_ENSURE(datum.is_arraylike(), "Unexpected block type (expecting array or chunked array)");
ForEachArrayData(datum, [this, i](const auto& arrayData) {
Deques[i].push_back(arrayData);
});
}
}

ui64 Slice() {
auto sliceSize = OutputRows_;
for (size_t i = 0; i < Deques.size(); i++) {
const auto& arrays = Deques[i];
if (arrays.empty()) {
continue;
}
Y_ABORT_UNLESS(ui64(arrays.front()->length) <= OutputRows_);
sliceSize = std::min<ui64>(sliceSize, arrays.front()->length);
}

for (size_t i = 0; i < Arrays.size(); i++) {
auto& arrays = Deques[i];
if (arrays.empty()) {
continue;
}
if (auto& head = arrays.front(); ui64(head->length) == sliceSize) {
Arrays[i] = std::move(head);
arrays.pop_front();
} else {
Arrays[i] = Chop(head, sliceSize);
}
}

OutputRows_ -= sliceSize;
return sliceSize;
}

NUdf::TUnboxedValuePod Get(const ui64 sliceSize, const THolderFactory& holderFactory, const size_t idx) const {
MKQL_ENSURE(idx <= OutputWidth_, "Deques index overflow");
// Return the slice length as the last column value (i.e. block length).
if (idx == OutputWidth_) {
return holderFactory.CreateArrowBlock(arrow::Datum(std::make_shared<arrow::UInt64Scalar>(sliceSize)));
}
if (auto array = Arrays[idx]) {
return holderFactory.CreateArrowBlock(std::move(array));
} else {
return NUdf::TUnboxedValuePod();
}
}

};

void MakeState(TComputationContext& ctx, NUdf::TUnboxedValue& state) const {
state = ctx.HolderFactory.Create<TState>(ctx, LeftFlowItems_, ResultJoinItems_, ctx.WideFields.data() + WideFieldsIndex_);
}
Expand Down
10 changes: 3 additions & 7 deletions ydb/library/yql/minikql/computation/mkql_block_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -357,13 +357,9 @@ void TBlockState::FillArrays() {
return;
}
MKQL_ENSURE(datum.is_arraylike(), "Unexpected block type (expecting array or chunked array)");
if (datum.is_array()) {
Deques[i].push_back(datum.array());
} else {
for (auto& chunk : datum.chunks()) {
Deques[i].push_back(chunk->data());
}
}
ForEachArrayData(datum, [this, i](const auto& arrayData) {
Deques[i].push_back(arrayData);
});
}
}
}
Expand Down
Loading