Skip to content

Commit

Permalink
Introduce TBlockJoinState (#8368)
Browse files Browse the repository at this point in the history
  • Loading branch information
igormunkin authored Aug 29, 2024
1 parent 6d41d83 commit 47066e9
Show file tree
Hide file tree
Showing 2 changed files with 149 additions and 201 deletions.
340 changes: 146 additions & 194 deletions ydb/library/yql/minikql/comp_nodes/mkql_block_map_join.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,154 @@ size_t CalcMaxBlockLength(const TVector<TType*>& items) {
}));
}

template <bool RightRequired>
class TBlockJoinState : public TBlockState {
public:
TBlockJoinState(TMemoryUsageInfo* memInfo, TComputationContext& ctx,
const TVector<TType*>& inputItems,
const TVector<TType*> outputItems,
NUdf::TUnboxedValue**const fields)
: TBlockState(memInfo, outputItems.size())
, InputWidth_(inputItems.size() - 1)
, OutputWidth_(outputItems.size() - 1)
, Inputs_(inputItems.size())
, InputsDescr_(ToValueDescr(inputItems))
{
const auto& pgBuilder = ctx.Builder->GetPgBuilder();
MaxLength_ = CalcMaxBlockLength(outputItems);
for (size_t i = 0; i < inputItems.size(); i++) {
fields[i] = &Inputs_[i];
const TType* blockItemType = AS_TYPE(TBlockType, inputItems[i])->GetItemType();
Readers_.push_back(MakeBlockReader(TTypeInfoHelper(), blockItemType));
Converters_.push_back(MakeBlockItemConverter(TTypeInfoHelper(), blockItemType, pgBuilder));
}
// The last output column (i.e. block length) doesn't require a block builder.
for (size_t i = 0; i < OutputWidth_; i++) {
const TType* blockItemType = AS_TYPE(TBlockType, outputItems[i])->GetItemType();
Builders_.push_back(MakeArrayBuilder(TTypeInfoHelper(), blockItemType, ctx.ArrowMemoryPool, MaxLength_, &pgBuilder, &BuilderAllocatedSize_));
}
MaxBuilderAllocatedSize_ = MaxAllocatedFactor_ * BuilderAllocatedSize_;
}

void CopyRow() {
// Copy items from the "left" flow.
for (size_t i = 0; i < InputWidth_; i++) {
AddItem(GetItem(i), i);
}
OutputRows_++;
}

void MakeRow(const NUdf::TUnboxedValuePod& value) {
// Copy items from the "left" flow.
for (size_t i = 0; i < InputWidth_; i++) {
AddItem(GetItem(i), i);
}
// Convert and append items from the "right" dict.
if constexpr (RightRequired) {
for (size_t i = InputWidth_, j = 0; i < OutputWidth_; i++, j++) {
AddValue(value.GetElement(j), i);
}
} else {
if (value) {
for (size_t i = InputWidth_, j = 0; i < OutputWidth_; i++, j++) {
AddValue(value.GetElement(j), i);
}
} else {
for (size_t i = InputWidth_; i < OutputWidth_; i++) {
AddValue(value, i);
}
}
}
OutputRows_++;
}

void MakeBlocks(const THolderFactory& holderFactory) {
Values.back() = holderFactory.CreateArrowBlock(arrow::Datum(std::make_shared<arrow::UInt64Scalar>(OutputRows_)));
OutputRows_ = 0;
BuilderAllocatedSize_ = 0;

for (size_t i = 0; i < Builders_.size(); i++) {
Values[i] = holderFactory.CreateArrowBlock(Builders_[i]->Build(IsFinished_));
}
FillArrays();
}

TBlockItem GetItem(size_t idx) const {
const auto& datum = TArrowBlock::From(Inputs_[idx]).GetDatum();
ARROW_DEBUG_CHECK_DATUM_TYPES(InputsDescr_[idx], datum.descr());
if (datum.is_scalar()) {
return Readers_[idx]->GetScalarItem(*datum.scalar());
}
MKQL_ENSURE(datum.is_array(), "Expecting array");
return Readers_[idx]->GetItem(*datum.array(), Current_);
}

NUdf::TUnboxedValuePod GetValue(const THolderFactory& holderFactory, size_t idx) const {
return Converters_[idx]->MakeValue(GetItem(idx), holderFactory);
}

void Reset() {
Next_ = 0;
InputRows_ = GetBlockCount(Inputs_.back());
}

void Finish() {
IsFinished_ = true;
}

bool NextRow() {
if (Next_ >= InputRows_) {
return false;
}
Current_ = Next_++;
return true;
}

bool IsNotFull() const {
return OutputRows_ < MaxLength_
&& BuilderAllocatedSize_ <= MaxBuilderAllocatedSize_;
}

bool IsEmpty() const {
return OutputRows_ == 0;
}

bool IsFinished() const {
return IsFinished_;
}

private:
void AddItem(const TBlockItem& item, size_t idx) {
Builders_[idx]->Add(item);
}

void AddValue(const NUdf::TUnboxedValuePod& value, size_t idx) {
Builders_[idx]->Add(value);
}

size_t Current_ = 0;
size_t Next_ = 0;
bool IsFinished_ = false;
size_t MaxLength_;
size_t BuilderAllocatedSize_ = 0;
size_t MaxBuilderAllocatedSize_ = 0;
static const size_t MaxAllocatedFactor_ = 4;
size_t InputRows_ = 0;
size_t OutputRows_ = 0;
size_t InputWidth_;
size_t OutputWidth_;
TUnboxedValueVector Inputs_;
const std::vector<arrow::ValueDescr> InputsDescr_;
TVector<std::unique_ptr<IBlockReader>> Readers_;
TVector<std::unique_ptr<IBlockItemConverter>> Converters_;
TVector<std::unique_ptr<IArrayBuilder>> Builders_;
};

template <bool WithoutRight, bool RightRequired>
class TBlockWideMapJoinWrapper : public TStatefulWideFlowComputationNode<TBlockWideMapJoinWrapper<WithoutRight, RightRequired>>
{
using TBaseComputation = TStatefulWideFlowComputationNode<TBlockWideMapJoinWrapper<WithoutRight, RightRequired>>;
using TState = TBlockJoinState<RightRequired>;
public:
TBlockWideMapJoinWrapper(TComputationMutables& mutables,
const TVector<TType*>&& resultJoinItems, const TVector<TType*>&& leftFlowItems,
Expand All @@ -36,7 +180,7 @@ using TBaseComputation = TStatefulWideFlowComputationNode<TBlockWideMapJoinWrapp
, LeftKeyColumns_(std::move(leftKeyColumns))
, Flow_(flow)
, Dict_(dict)
, WideFieldsIndex_(mutables.IncrementWideFieldsIndex(ResultJoinItems_.size()))
, WideFieldsIndex_(mutables.IncrementWideFieldsIndex(LeftFlowItems_.size()))
{}

EFetchResult DoCalculate(NUdf::TUnboxedValue& state, TComputationContext& ctx, NUdf::TUnboxedValue*const* output) const {
Expand Down Expand Up @@ -79,7 +223,7 @@ using TBaseComputation = TStatefulWideFlowComputationNode<TBlockWideMapJoinWrapp
if (s.IsEmpty()) {
return EFetchResult::Finish;
}
s.MakeBlocks();
s.MakeBlocks(ctx.HolderFactory);
const auto sliceSize = s.Slice();

for (size_t i = 0; i < ResultJoinItems_.size(); i++) {
Expand All @@ -98,198 +242,6 @@ using TBaseComputation = TStatefulWideFlowComputationNode<TBlockWideMapJoinWrapp
}
}

class TState : public TComputationValue<TState> {
using TBase = TComputationValue<TState>;
size_t Current_ = 0;
size_t Next_ = 0;
bool IsFinished_ = false;
size_t MaxLength_;
size_t BuilderAllocatedSize_ = 0;
size_t MaxBuilderAllocatedSize_ = 0;
static const size_t MaxAllocatedFactor_ = 4;
size_t InputRows_ = 0;
size_t OutputRows_ = 0;
size_t InputWidth_;
size_t OutputWidth_;
TUnboxedValueVector Inputs_;
const std::vector<arrow::ValueDescr> InputsDescr_;
TVector<std::deque<std::shared_ptr<arrow::ArrayData>>> Deques;
TVector<std::shared_ptr<arrow::ArrayData>> Arrays;
TVector<std::unique_ptr<IBlockReader>> Readers_;
TVector<std::unique_ptr<IBlockItemConverter>> Converters_;
TVector<std::unique_ptr<IArrayBuilder>> Builders_;

public:
TState(TMemoryUsageInfo* memInfo, TComputationContext& ctx,
const TVector<TType*>& inputItems, const TVector<TType*> outputItems,
NUdf::TUnboxedValue**const fields)
: TBase(memInfo)
, InputWidth_(inputItems.size() - 1)
, OutputWidth_(outputItems.size() - 1)
, Inputs_(inputItems.size())
, InputsDescr_(ToValueDescr(inputItems))
, Deques(OutputWidth_)
, Arrays(OutputWidth_)
{
const auto& pgBuilder = ctx.Builder->GetPgBuilder();
MaxLength_ = CalcMaxBlockLength(outputItems);
for (size_t i = 0; i < inputItems.size(); i++) {
fields[i] = &Inputs_[i];
const TType* blockItemType = AS_TYPE(TBlockType, inputItems[i])->GetItemType();
Readers_.push_back(MakeBlockReader(TTypeInfoHelper(), blockItemType));
Converters_.push_back(MakeBlockItemConverter(TTypeInfoHelper(), blockItemType, pgBuilder));
}
// The last output column (i.e. block length) doesn't require a block builder.
for (size_t i = 0; i < OutputWidth_; i++) {
const TType* blockItemType = AS_TYPE(TBlockType, outputItems[i])->GetItemType();
Builders_.push_back(MakeArrayBuilder(TTypeInfoHelper(), blockItemType, ctx.ArrowMemoryPool, MaxLength_, &pgBuilder, &BuilderAllocatedSize_));
}
MaxBuilderAllocatedSize_ = MaxAllocatedFactor_ * BuilderAllocatedSize_;
}

void Reset() {
Next_ = 0;
InputRows_ = GetBlockCount(Inputs_.back());
}

void Finish() {
IsFinished_ = true;
}

bool NextRow() {
if (Next_ >= InputRows_) {
return false;
}
Current_ = Next_++;
return true;
}

bool IsNotFull() {
return OutputRows_ < MaxLength_
&& BuilderAllocatedSize_ <= MaxBuilderAllocatedSize_;
}

bool IsEmpty() {
return OutputRows_ == 0;
}

bool IsFinished() {
return IsFinished_;
}

TBlockItem GetItem(size_t idx) const {
const auto& datum = TArrowBlock::From(Inputs_[idx]).GetDatum();
ARROW_DEBUG_CHECK_DATUM_TYPES(InputsDescr_[idx], datum.descr());
if (datum.is_scalar()) {
return Readers_[idx]->GetScalarItem(*datum.scalar());
}
MKQL_ENSURE(datum.is_array(), "Expecting array");
return Readers_[idx]->GetItem(*datum.array(), Current_);
}

NUdf::TUnboxedValuePod GetValue(const THolderFactory& holderFactory, size_t idx) const {
return Converters_[idx]->MakeValue(GetItem(idx), holderFactory);
}

void AddValue(const NUdf::TUnboxedValuePod& value, size_t idx) {
Builders_[idx]->Add(value);
}

void AddItem(const TBlockItem& item, size_t idx) {
Builders_[idx]->Add(item);
}

void CopyRow() {
// Copy items from the "left" flow.
for (size_t i = 0; i < InputWidth_; i++) {
AddItem(GetItem(i), i);
}
OutputRows_++;
}

void MakeRow(const NUdf::TUnboxedValuePod& value) {
// Copy items from the "left" flow.
for (size_t i = 0; i < InputWidth_; i++) {
AddItem(GetItem(i), i);
}
// Convert and append items from the "right" dict.
if (value) {
for (size_t i = InputWidth_, j = 0; i < OutputWidth_; i++, j++) {
AddValue(value.GetElement(j), i);
}
} else {
for (size_t i = InputWidth_; i < OutputWidth_; i++) {
AddValue(value, i);
}
}
OutputRows_++;
}

void CopyArray(size_t idx, size_t popCount, const ui8* sparseBitmap, size_t bitmapSize) {
const auto& datum = TArrowBlock::From(Inputs_[idx]).GetDatum();
Y_ENSURE(datum.is_array());
Builders_[idx]->AddMany(*datum.array(), popCount, sparseBitmap, bitmapSize);
}

void MakeBlocks() {
if (OutputRows_ == 0) {
return;
}
BuilderAllocatedSize_ = 0;

for (size_t i = 0; i < Builders_.size(); i++) {
const auto& datum = Builders_[i]->Build(IsFinished_);
Deques[i].clear();
MKQL_ENSURE(datum.is_arraylike(), "Unexpected block type (expecting array or chunked array)");
ForEachArrayData(datum, [this, i](const auto& arrayData) {
Deques[i].push_back(arrayData);
});
}
}

ui64 Slice() {
auto sliceSize = OutputRows_;
for (size_t i = 0; i < Deques.size(); i++) {
const auto& arrays = Deques[i];
if (arrays.empty()) {
continue;
}
Y_ABORT_UNLESS(ui64(arrays.front()->length) <= OutputRows_);
sliceSize = std::min<ui64>(sliceSize, arrays.front()->length);
}

for (size_t i = 0; i < Arrays.size(); i++) {
auto& arrays = Deques[i];
if (arrays.empty()) {
continue;
}
if (auto& head = arrays.front(); ui64(head->length) == sliceSize) {
Arrays[i] = std::move(head);
arrays.pop_front();
} else {
Arrays[i] = Chop(head, sliceSize);
}
}

OutputRows_ -= sliceSize;
return sliceSize;
}

NUdf::TUnboxedValuePod Get(const ui64 sliceSize, const THolderFactory& holderFactory, const size_t idx) const {
MKQL_ENSURE(idx <= OutputWidth_, "Deques index overflow");
// Return the slice length as the last column value (i.e. block length).
if (idx == OutputWidth_) {
return holderFactory.CreateArrowBlock(arrow::Datum(std::make_shared<arrow::UInt64Scalar>(sliceSize)));
}
if (auto array = Arrays[idx]) {
return holderFactory.CreateArrowBlock(std::move(array));
} else {
return NUdf::TUnboxedValuePod();
}
}

};

void MakeState(TComputationContext& ctx, NUdf::TUnboxedValue& state) const {
state = ctx.HolderFactory.Create<TState>(ctx, LeftFlowItems_, ResultJoinItems_, ctx.WideFields.data() + WideFieldsIndex_);
}
Expand Down
10 changes: 3 additions & 7 deletions ydb/library/yql/minikql/computation/mkql_block_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -357,13 +357,9 @@ void TBlockState::FillArrays() {
return;
}
MKQL_ENSURE(datum.is_arraylike(), "Unexpected block type (expecting array or chunked array)");
if (datum.is_array()) {
Deques[i].push_back(datum.array());
} else {
for (auto& chunk : datum.chunks()) {
Deques[i].push_back(chunk->data());
}
}
ForEachArrayData(datum, [this, i](const auto& arrayData) {
Deques[i].push_back(arrayData);
});
}
}
}
Expand Down

0 comments on commit 47066e9

Please sign in to comment.