Skip to content

Commit

Permalink
additional test and addressing fixes (#7168)
Browse files Browse the repository at this point in the history
  • Loading branch information
ivanmorozov333 authored Jul 29, 2024
1 parent 4743435 commit 46db233
Show file tree
Hide file tree
Showing 23 changed files with 516 additions and 285 deletions.
114 changes: 86 additions & 28 deletions ydb/core/formats/arrow/accessor/abstract/accessor.cpp
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
#include "accessor.h"

#include <ydb/core/formats/arrow/arrow_helpers.h>
#include <ydb/core/formats/arrow/permutations.h>
#include <ydb/core/formats/arrow/save_load/saver.h>
#include <ydb/core/formats/arrow/size_calcer.h>
#include <ydb/core/formats/arrow/splitter/simple.h>
#include <ydb/core/formats/arrow/switch/compare.h>
#include <ydb/core/formats/arrow/switch/switch_type.h>

#include <ydb/library/actors/core/log.h>
#include <ydb/core/formats/arrow/permutations.h>
#include <ydb/core/formats/arrow/arrow_helpers.h>
#include <ydb/core/formats/arrow/splitter/simple.h>
#include <ydb/core/formats/arrow/save_load/saver.h>

namespace NKikimr::NArrow::NAccessor {

Expand All @@ -17,18 +19,18 @@ void IChunkedArray::TReader::AppendPositionTo(arrow::ArrayBuilder& builder, cons

std::shared_ptr<arrow::Array> IChunkedArray::TReader::CopyRecord(const ui64 recordIndex) const {
auto address = GetReadChunk(recordIndex);
return NArrow::CopyRecords(address.GetArray(), {address.GetPosition()});
return NArrow::CopyRecords(address.GetArray(), { address.GetPosition() });
}

std::shared_ptr<arrow::ChunkedArray> IChunkedArray::Slice(const ui32 offset, const ui32 count) const {
AFL_VERIFY(offset + count <= (ui64)GetRecordsCount())("offset", offset)("count", count)("length", GetRecordsCount());
ui32 currentOffset = offset;
ui32 countLeast = count;
std::vector<std::shared_ptr<arrow::Array>> chunks;
auto address = GetChunk({}, offset);
auto address = GetChunkSlow(offset);
while (countLeast) {
address = GetChunk(address, currentOffset);
const ui64 internalPos = currentOffset - address.GetStartPosition();
address = GetChunk(address.GetAddress(), currentOffset);
const ui64 internalPos = address.GetAddress().GetLocalIndex(currentOffset);
if (internalPos + countLeast <= (ui64)address.GetArray()->length()) {
chunks.emplace_back(address.GetArray()->Slice(internalPos, countLeast));
break;
Expand All @@ -43,12 +45,73 @@ std::shared_ptr<arrow::ChunkedArray> IChunkedArray::Slice(const ui32 offset, con
return std::make_shared<arrow::ChunkedArray>(chunks, DataType);
}

NKikimr::NArrow::NAccessor::IChunkedArray::TFullDataAddress IChunkedArray::GetChunk(
const std::optional<TAddressChain>& chunkCurrent, const ui64 position) const {
AFL_VERIFY(position < GetRecordsCount());
std::optional<TCommonChunkAddress> address;

if (IsDataOwner()) {
if (chunkCurrent) {
AFL_VERIFY(chunkCurrent->GetSize() == 1)("size", chunkCurrent->GetSize());
}
auto localAddress = GetLocalData(address, position);
TAddressChain addressChain;
addressChain.Add(localAddress.GetAddress());
AFL_VERIFY(addressChain.Contains(position));
return TFullDataAddress(localAddress.GetArray(), std::move(addressChain));
} else {
auto chunkedArrayAddress = GetArray(chunkCurrent, position, nullptr);
if (chunkCurrent) {
AFL_VERIFY(chunkCurrent->GetSize() == 1 + chunkedArrayAddress.GetAddress().GetSize())("current", chunkCurrent->GetSize())(
"chunked", chunkedArrayAddress.GetAddress().GetSize());
}
auto localAddress = chunkedArrayAddress.GetArray()->GetLocalData(address, chunkedArrayAddress.GetAddress().GetLocalIndex(position));
auto fullAddress = std::move(chunkedArrayAddress.MutableAddress());
fullAddress.Add(localAddress.GetAddress());
AFL_VERIFY(fullAddress.Contains(position));
return TFullDataAddress(localAddress.GetArray(), std::move(fullAddress));
}
}

IChunkedArray::TFullChunkedArrayAddress IChunkedArray::GetArray(
const std::optional<TAddressChain>& chunkCurrent, const ui64 position, const std::shared_ptr<IChunkedArray>& selfPtr) const {
AFL_VERIFY(position < GetRecordsCount());
if (IsDataOwner()) {
AFL_VERIFY(selfPtr);
TAddressChain chain;
chain.Add(TCommonChunkAddress(0, GetRecordsCount(), 0));
return IChunkedArray::TFullChunkedArrayAddress(selfPtr, std::move(chain));
}
TAddressChain addressChain;

auto* currentLevel = this;
ui32 currentPosition = position;
ui32 idx = 0;
std::vector<std::shared_ptr<IChunkedArray>> chainForTemporarySave;
while (!currentLevel->IsDataOwner()) {
std::optional<TCommonChunkAddress> currentAddress;
if (chunkCurrent) {
currentAddress = chunkCurrent->GetAddress(idx);
}
auto nextChunkedArray = currentLevel->GetLocalChunkedArray(currentAddress, currentPosition);
chainForTemporarySave.emplace_back(nextChunkedArray.GetArray());
currentLevel = chainForTemporarySave.back().get();
addressChain.Add(nextChunkedArray.GetAddress());
AFL_VERIFY(nextChunkedArray.GetAddress().GetStartPosition() <= currentPosition);
currentPosition -= nextChunkedArray.GetAddress().GetStartPosition();
++idx;
}
AFL_VERIFY(!chunkCurrent || chunkCurrent->GetSize() - idx <= 1)("idx", idx)("size", chunkCurrent->GetSize());
return TFullChunkedArrayAddress(chainForTemporarySave.back(), std::move(addressChain));
}

TString IChunkedArray::TReader::DebugString(const ui32 position) const {
auto address = GetReadChunk(position);
return NArrow::DebugString(address.GetArray(), address.GetPosition());
}

std::partial_ordering IChunkedArray::TReader::CompareColumns(const std::vector<TReader>& l, const ui64 lPosition, const std::vector<TReader>& r, const ui64 rPosition) {
std::partial_ordering IChunkedArray::TReader::CompareColumns(
const std::vector<TReader>& l, const ui64 lPosition, const std::vector<TReader>& r, const ui64 rPosition) {
AFL_VERIFY(l.size() == r.size());
for (ui32 i = 0; i < l.size(); ++i) {
const TAddress lAddress = l[i].GetReadChunk(lPosition);
Expand All @@ -63,43 +126,38 @@ std::partial_ordering IChunkedArray::TReader::CompareColumns(const std::vector<T

IChunkedArray::TAddress IChunkedArray::TReader::GetReadChunk(const ui64 position) const {
AFL_VERIFY(position < ChunkedArray->GetRecordsCount());
if (CurrentChunkAddress && position < CurrentChunkAddress->GetStartPosition() + CurrentChunkAddress->GetArray()->length() && CurrentChunkAddress->GetStartPosition() <= position) {
if (CurrentChunkAddress && CurrentChunkAddress->GetAddress().Contains(position)) {
} else {
CurrentChunkAddress = ChunkedArray->DoGetChunk(CurrentChunkAddress, position);
CurrentChunkAddress = ChunkedArray->GetChunk(CurrentChunkAddress, position);
}
return IChunkedArray::TAddress(CurrentChunkAddress->GetArray(), position - CurrentChunkAddress->GetStartPosition(), CurrentChunkAddress->GetChunkIndex());
return IChunkedArray::TAddress(CurrentChunkAddress->GetArray(), CurrentChunkAddress->GetAddress().GetLocalIndex(position));
}

const std::partial_ordering IChunkedArray::TAddress::Compare(const TAddress& item) const {
return TComparator::TypedCompare<true>(*Array, Position, *item.Array, item.Position);
}

TChunkedArraySerialized::TChunkedArraySerialized(const std::shared_ptr<IChunkedArray>& array, const TString& serializedData)
TChunkedArraySerialized::TChunkedArraySerialized(const std::shared_ptr<IChunkedArray>& array, const TString& serializedData)
: Array(array)
, SerializedData(serializedData) {
AFL_VERIFY(serializedData);
AFL_VERIFY(Array);
AFL_VERIFY(Array->GetRecordsCount());
}

std::partial_ordering IChunkedArray::TCurrentChunkAddress::Compare(const ui64 position, const TCurrentChunkAddress& item, const ui64 itemPosition) const {
AFL_VERIFY(GetStartPosition() <= position)("pos", position)("start", GetStartPosition());
AFL_VERIFY(position < GetFinishPosition())("pos", position)("finish", GetFinishPosition());
AFL_VERIFY(item.GetStartPosition() <= itemPosition)("start", item.GetStartPosition())("item", itemPosition);
AFL_VERIFY(itemPosition < item.GetFinishPosition())("item", itemPosition)("finish", item.GetFinishPosition());
return TComparator::TypedCompare<true>(*Array, position - GetStartPosition(), *item.Array, itemPosition - item.GetStartPosition());
std::partial_ordering IChunkedArray::TFullDataAddress::Compare(
const ui64 position, const TFullDataAddress& item, const ui64 itemPosition) const {
AFL_VERIFY(Address.Contains(position))("pos", position)("start", Address.DebugString());
AFL_VERIFY(item.Address.Contains(itemPosition))("pos", itemPosition)("start", item.Address.DebugString());
return TComparator::TypedCompare<true>(*Array, Address.GetLocalIndex(position), *item.Array, item.Address.GetLocalIndex(itemPosition));
}

std::shared_ptr<arrow::Array> IChunkedArray::TCurrentChunkAddress::CopyRecord(const ui64 recordIndex) const {
AFL_VERIFY(GetStartPosition() <= recordIndex);
AFL_VERIFY(recordIndex < GetFinishPosition());
return NArrow::CopyRecords(Array, { recordIndex - GetStartPosition() });
std::shared_ptr<arrow::Array> IChunkedArray::TFullDataAddress::CopyRecord(const ui64 recordIndex) const {
return NArrow::CopyRecords(Array, { Address.GetLocalIndex(recordIndex) });
}

TString IChunkedArray::TCurrentChunkAddress::DebugString(const ui64 position) const {
AFL_VERIFY(position < GetFinishPosition());
AFL_VERIFY(GetStartPosition() <= position);
return NArrow::DebugString(Array, position - GetStartPosition());
TString IChunkedArray::TFullDataAddress::DebugString(const ui64 position) const {
return NArrow::DebugString(Array, Address.GetLocalIndex(position));
}

}
} // namespace NKikimr::NArrow::NAccessor
Loading

0 comments on commit 46db233

Please sign in to comment.