From dcafd3292342a85ddf167d6f4de4fe40600943bb Mon Sep 17 00:00:00 2001 From: Xiaoxuan Meng Date: Thu, 30 Jan 2025 16:56:52 -0800 Subject: [PATCH] feat: Add initial index lookup join operator implementation (#12218) Summary: Pull Request resolved: https://github.com/facebookincubator/velox/pull/12218 Add the initial index lookup operator implementation which supports inner join and left join from an index source. The index lookup join operator takes input from the probe side with one batch at a time. For each probe input batch, it sends the lookup request from the index source and gets a lookup result iterator. The result iterator produces the output in batches through a future for async processing at the storage backend. For each lookup result, the index join operator produces one or more output batches based on the operator's output batch size limit. For left join, the index join operator needs to detect the input request rows that have no matches and produce output with nulls for the columns from the lookup table. Reviewed By: mbasmanova Differential Revision: D68906030 fbshipit-source-id: 4038c31653bc96893497b3c26d935fa2f7b58f43 --- velox/exec/CMakeLists.txt | 1 + velox/exec/IndexLookupJoin.cpp | 595 +++++++++++++++++++ velox/exec/IndexLookupJoin.h | 161 ++++++ velox/exec/LocalPlanner.cpp | 7 + velox/exec/tests/IndexLookupJoinTest.cpp | 698 +++++++++++++++++++++-- 5 files changed, 1426 insertions(+), 36 deletions(-) create mode 100644 velox/exec/IndexLookupJoin.cpp create mode 100644 velox/exec/IndexLookupJoin.h diff --git a/velox/exec/CMakeLists.txt b/velox/exec/CMakeLists.txt index 263f43b0b5e9..4a6315f43639 100644 --- a/velox/exec/CMakeLists.txt +++ b/velox/exec/CMakeLists.txt @@ -41,6 +41,7 @@ velox_add_library( HashPartitionFunction.cpp HashProbe.cpp HashTable.cpp + IndexLookupJoin.cpp JoinBridge.cpp Limit.cpp LocalPartition.cpp diff --git a/velox/exec/IndexLookupJoin.cpp b/velox/exec/IndexLookupJoin.cpp new file mode 100644 index 000000000000..89d0d9b23d9d --- /dev/null +++ b/velox/exec/IndexLookupJoin.cpp @@ -0,0 +1,595 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "velox/exec/IndexLookupJoin.h" + +#include "velox/buffer/Buffer.h" +#include "velox/connectors/Connector.h" +#include "velox/exec/Task.h" +#include "velox/expression/Expr.h" +#include "velox/expression/FieldReference.h" + +namespace facebook::velox::exec { +namespace { +void duplicateJoinKeyCheck( + const std::vector& keys) { + folly::F14FastSet lookupKeyNames; + for (const auto& key : keys) { + lookupKeyNames.insert(key->name()); + } + VELOX_USER_CHECK_EQ(lookupKeyNames.size(), keys.size()); +} +} // namespace +IndexLookupJoin::IndexLookupJoin( + int32_t operatorId, + DriverCtx* driverCtx, + const std::shared_ptr& joinNode) + : Operator( + driverCtx, + joinNode->outputType(), + operatorId, + joinNode->id(), + "IndexLookupJoin"), + // TODO: support to update output batch size with output size stats during + // the lookup processing. + outputBatchSize_{outputBatchRows()}, + joinType_{joinNode->joinType()}, + numKeys_{joinNode->leftKeys().size()}, + probeType_{joinNode->sources()[0]->outputType()}, + lookupType_{joinNode->lookupSource()->outputType()}, + lookupTableHandle_{joinNode->lookupSource()->tableHandle()}, + lookupColumnHandles_(joinNode->lookupSource()->assignments()), + connectorQueryCtx_{operatorCtx_->createConnectorQueryCtx( + lookupTableHandle_->connectorId(), + planNodeId(), + driverCtx->task->addConnectorPoolLocked( + planNodeId(), + driverCtx->pipelineId, + driverCtx->driverId, + operatorType(), + lookupTableHandle_->connectorId()), + spillConfig_.has_value() ? &(spillConfig_.value()) : nullptr)}, + expressionEvaluator_(connectorQueryCtx_->expressionEvaluator()), + connector_(connector::getConnector(lookupTableHandle_->connectorId())), + joinNode_{joinNode} { + VELOX_CHECK_EQ(joinNode_->sources()[1], joinNode_->lookupSource()); + duplicateJoinKeyCheck(joinNode_->leftKeys()); + duplicateJoinKeyCheck(joinNode_->rightKeys()); +} + +void IndexLookupJoin::initialize() { + Operator::initialize(); + VELOX_CHECK_NOT_NULL(joinNode_); + VELOX_CHECK_NULL(indexSource_); + + SCOPE_EXIT { + joinNode_.reset(); + }; + + initLookupInput(); + initLookupOutput(); + initOutputProjections(); + + indexSource_ = connector_->createIndexSource( + lookupInputType_, + numKeys_, + lookupConditions_, + lookupOutputType_, + lookupTableHandle_, + lookupColumnHandles_, + connectorQueryCtx_.get()); +} + +void IndexLookupJoin::initLookupInput() { + VELOX_CHECK_NULL(lookupInputType_); + VELOX_CHECK(lookupInputChannels_.empty()); + + std::vector lookupInputNames; + lookupInputNames.reserve(numKeys_ + lookupConditions_.size()); + std::vector lookupInputTypes; + lookupInputTypes.reserve(numKeys_ + lookupConditions_.size()); + lookupInputChannels_.reserve(numKeys_ + lookupConditions_.size()); + + SCOPE_EXIT { + VELOX_CHECK_GE( + lookupInputNames.size(), numKeys_ + lookupConditions_.size()); + VELOX_CHECK_EQ(lookupInputNames.size(), lookupInputChannels_.size()); + lookupInputType_ = + ROW(std::move(lookupInputNames), std::move(lookupInputTypes)); + }; + + // List probe key columns used in join-equi caluse first. + folly::F14FastSet probeKeyColumnNames; + for (auto keyIdx = 0; keyIdx < numKeys_; ++keyIdx) { + lookupInputNames.emplace_back(joinNode_->leftKeys()[keyIdx]->name()); + const auto probeKeyChannel = + probeType_->getChildIdx(lookupInputNames.back()); + lookupInputChannels_.emplace_back(probeKeyChannel); + lookupInputTypes.emplace_back(probeType_->childAt(probeKeyChannel)); + VELOX_CHECK_EQ(probeKeyColumnNames.count(lookupInputNames.back()), 0); + probeKeyColumnNames.insert(lookupInputNames.back()); + } + + if (lookupConditions_.empty()) { + return; + } + + folly::F14FastSet probeConditionColumnNames; + folly::F14FastSet lookupConditionColumnNames; + for (const auto& lookupCondition : lookupConditions_) { + const auto lookupConditionExprSet = + expressionEvaluator_->compile(lookupCondition); + const auto& lookupConditionExpr = lookupConditionExprSet->expr(0); + + int numProbeColumns{0}; + int numLookupColumns{0}; + for (auto& input : lookupConditionExpr->distinctFields()) { + const auto& columnName = input->field(); + auto probeIndexOpt = probeType_->getChildIdxIfExists(columnName); + if (probeIndexOpt.has_value()) { + ++numProbeColumns; + // There is no overlap between probe key columns and probe condition + // columns. + VELOX_CHECK_EQ(probeKeyColumnNames.count(columnName), 0); + // We allow the probe column used in more than one lookup conditions. + if (probeConditionColumnNames.count(columnName) == 0) { + probeConditionColumnNames.insert(columnName); + lookupInputChannels_.push_back(probeIndexOpt.value()); + lookupInputNames.push_back(columnName); + lookupInputTypes.push_back(input->type()); + } + continue; + } + + ++numLookupColumns; + auto lookupIndexOpt = lookupType_->getChildIdxIfExists(columnName); + VELOX_CHECK( + lookupIndexOpt.has_value(), + "Lookup condition column {} is not found", + columnName); + // A lookup column can only be used in on lookup condition. + VELOX_CHECK( + lookupConditionColumnNames.count(columnName), + 0, + "Lookup condition column {} from lookup table used in more than one lookup conditions", + input->field()); + lookupConditionColumnNames.insert(input->field()); + } + + VELOX_CHECK_EQ( + numLookupColumns, + 1, + "Unexpected number of lookup columns in lookup condition {}", + lookupConditionExpr->toString()); + VELOX_CHECK_GT( + numProbeColumns, + 0, + "No probe columns found in lookup condition {}", + lookupConditionExpr->toString()); + } +} + +void IndexLookupJoin::initLookupOutput() { + VELOX_CHECK_NULL(lookupOutputType_); + + std::vector lookupOutputNames; + std::vector lookupOutputTypes; + const auto& lookupSourceOutputType = joinNode_->lookupSource()->outputType(); + for (auto i = 0; i < outputType_->size(); ++i) { + const auto& name = outputType_->nameOf(i); + const auto lookupChannelOpt = + lookupSourceOutputType->getChildIdxIfExists(name); + if (!lookupChannelOpt.has_value()) { + continue; + } + lookupOutputNames.push_back(name); + lookupOutputTypes.push_back( + lookupSourceOutputType->childAt(lookupChannelOpt.value())); + VELOX_CHECK(outputType_->childAt(i)->equivalent(*lookupOutputTypes.back())); + } + // TODO: support index lookup without output value columns. + VELOX_CHECK( + !lookupOutputNames.empty(), + "Must read at least one value column from index lookup table"); + lookupOutputType_ = + ROW(std::move(lookupOutputNames), std::move(lookupOutputTypes)); +} + +void IndexLookupJoin::initOutputProjections() { + for (auto i = 0; i < probeType_->size(); ++i) { + const auto name = probeType_->nameOf(i); + const auto outputChannelOpt = outputType_->getChildIdxIfExists(name); + if (!outputChannelOpt.has_value()) { + continue; + } + probeOutputProjections_.emplace_back(i, outputChannelOpt.value()); + } + if (joinType_ == core::JoinType::kLeft) { + VELOX_USER_CHECK( + !probeOutputProjections_.empty(), + "Lookup join with left join type must read at least one column from probe side"); + } + + for (auto i = 0; i < lookupOutputType_->size(); ++i) { + const auto& name = lookupOutputType_->nameOf(i); + VELOX_USER_CHECK_EQ( + lookupColumnHandles_.count(name), + 1, + "Lookup output column {} is not found in lookup table handle", + name); + const auto outputChannelOpt = outputType_->getChildIdxIfExists(name); + if (!outputChannelOpt.has_value()) { + continue; + } + lookupOutputProjections_.emplace_back(i, outputChannelOpt.value()); + } + VELOX_USER_CHECK_EQ( + probeOutputProjections_.size() + lookupOutputProjections_.size(), + outputType_->size()); +} + +BlockingReason IndexLookupJoin::isBlocked(ContinueFuture* future) { + if (!lookupFuture_.valid()) { + return BlockingReason::kNotBlocked; + } + *future = std::move(lookupFuture_); + return BlockingReason::kWaitForIndexLookup; +} + +void IndexLookupJoin::addInput(RowVectorPtr input) { + VELOX_CHECK_GT(input->size(), 0); + VELOX_CHECK_NULL(input_); + input_ = std::move(input); +} + +RowVectorPtr IndexLookupJoin::getOutput() { + if (input_ == nullptr) { + return nullptr; + } + + if (lookupResultIter_ == nullptr) { + VELOX_CHECK(!lookupFuture_.valid()); + prepareLookupInput(); + lookup(); + } + + VELOX_CHECK_NOT_NULL(lookupResultIter_); + auto output = getOutputFromLookupResult(); + if (output == nullptr) { + return nullptr; + } + if (output->size() == 0) { + return nullptr; + } + return output; +} + +void IndexLookupJoin::prepareLookupInput() { + VELOX_CHECK_NOT_NULL(input_); + if (lookupInput_ == nullptr) { + lookupInput_ = + BaseVector::create(lookupInputType_, input_->size(), pool()); + } else { + VectorPtr input = std::move(lookupInput_); + BaseVector::prepareForReuse(input, input_->size()); + lookupInput_ = std::static_pointer_cast(input); + } + + for (auto i = 0; i < lookupInputType_->size(); ++i) { + lookupInput_->childAt(i) = input_->childAt(lookupInputChannels_[i]); + lookupInput_->childAt(i)->loadedVector(); + } +} + +void IndexLookupJoin::lookup() { + VELOX_CHECK_NOT_NULL(indexSource_); + VELOX_CHECK_NOT_NULL(input_); + VELOX_CHECK_NOT_NULL(lookupInput_); + VELOX_CHECK_NULL(lookupResultIter_); + VELOX_CHECK_EQ(lookupInput_->size(), input_->size()); + + lookupResultIter_ = + indexSource_->lookup(connector::IndexSource::LookupRequest{lookupInput_}); +} + +RowVectorPtr IndexLookupJoin::getOutputFromLookupResult() { + VELOX_CHECK_NOT_NULL(input_); + VELOX_CHECK_NOT_NULL(lookupResultIter_); + + if (lookupResult_ == nullptr) { + auto resultOptional = + lookupResultIter_->next(outputBatchSize_, lookupFuture_); + if (!resultOptional.has_value()) { + VELOX_CHECK(lookupFuture_.valid()); + return nullptr; + } + VELOX_CHECK(!lookupFuture_.valid()); + + lookupResult_ = std::move(resultOptional).value(); + if (lookupResult_ == nullptr) { + if (hasRemainingOutputForLeftJoin()) { + return produceRemainingOutputForLeftJoin(); + } + finishInput(); + return nullptr; + } + rawLookupInputHitIndices_ = + lookupResult_->inputHits->as(); + } + VELOX_CHECK_NOT_NULL(lookupResult_); + + SCOPE_EXIT { + maybeFinishLookupResult(); + }; + if (joinType_ == core::JoinType::kInner) { + return produceOutputForInnerJoin(); + } + return produceOutputForLeftJoin(); +} + +void IndexLookupJoin::maybeFinishLookupResult() { + VELOX_CHECK_NOT_NULL(lookupResult_); + if (nextOutputResultRow_ == lookupResult_->size()) { + lookupResult_ = nullptr; + nextOutputResultRow_ = 0; + rawLookupInputHitIndices_ = nullptr; + } +} + +bool IndexLookupJoin::hasRemainingOutputForLeftJoin() const { + if (joinType_ != core::JoinType::kLeft) { + return false; + } + if ((lastProcessedInputRow_.value_or(-1) + 1) >= input_->size()) { + return false; + } + return true; +} + +void IndexLookupJoin::finishInput() { + VELOX_CHECK_NOT_NULL(input_); + VELOX_CHECK_NOT_NULL(lookupResultIter_); + VELOX_CHECK(!lookupFuture_.valid()); + + lookupResultIter_ = nullptr; + lookupResult_ = nullptr; + lastProcessedInputRow_ = std::nullopt; + nextOutputResultRow_ = 0; + input_ = nullptr; +} + +void IndexLookupJoin::prepareOutput(vector_size_t numOutputRows) { + if (output_ == nullptr) { + output_ = BaseVector::create(outputType_, numOutputRows, pool()); + } else { + VectorPtr output = std::move(output_); + BaseVector::prepareForReuse(output, numOutputRows); + output_ = std::static_pointer_cast(output); + } +} + +RowVectorPtr IndexLookupJoin::produceOutputForInnerJoin() { + VELOX_CHECK_EQ(joinType_, core::JoinType::kInner); + VELOX_CHECK_NOT_NULL(lookupResult_); + VELOX_CHECK_LE(nextOutputResultRow_, lookupResult_->size()); + + const size_t numOutputRows = std::min( + lookupResult_->size() - nextOutputResultRow_, outputBatchSize_); + prepareOutput(numOutputRows); + if (numOutputRows == lookupResult_->size()) { + for (const auto& projection : probeOutputProjections_) { + output_->childAt(projection.outputChannel) = BaseVector::wrapInDictionary( + nullptr, + lookupResult_->inputHits, + numOutputRows, + input_->childAt(projection.inputChannel)); + } + for (const auto& projection : lookupOutputProjections_) { + output_->childAt(projection.outputChannel) = + lookupResult_->output->childAt(projection.inputChannel); + } + } else { + for (const auto& projection : probeOutputProjections_) { + output_->childAt(projection.outputChannel) = BaseVector::wrapInDictionary( + nullptr, + Buffer::slice( + lookupResult_->inputHits, + nextOutputResultRow_, + numOutputRows, + pool()), + numOutputRows, + input_->childAt(projection.inputChannel)); + } + for (const auto& projection : lookupOutputProjections_) { + output_->childAt(projection.outputChannel) = + lookupResult_->output->childAt(projection.inputChannel) + ->slice(nextOutputResultRow_, numOutputRows); + } + } + + nextOutputResultRow_ += numOutputRows; + VELOX_CHECK_LE(nextOutputResultRow_, lookupResult_->size()); + return output_; +} + +RowVectorPtr IndexLookupJoin::produceOutputForLeftJoin() { + VELOX_CHECK_EQ(joinType_, core::JoinType::kLeft); + VELOX_CHECK_NOT_NULL(lookupResult_); + VELOX_CHECK_LE(nextOutputResultRow_, lookupResult_->size()); + VELOX_CHECK_NOT_NULL(rawLookupInputHitIndices_); + + prepareOutputRowMappings(outputBatchSize_); + VELOX_CHECK_NOT_NULL(rawLookupOutputNulls_); + + size_t numOutputRows{0}; + size_t totalMissedInputRows{0}; + int32_t lastProcessedInputRow = lastProcessedInputRow_.value_or(-1); + for (; numOutputRows < outputBatchSize_ && + nextOutputResultRow_ < lookupResult_->size();) { + VELOX_CHECK_GE( + rawLookupInputHitIndices_[nextOutputResultRow_], lastProcessedInputRow); + const vector_size_t numMissedInputRows = + rawLookupInputHitIndices_[nextOutputResultRow_] - + lastProcessedInputRow - 1; + VELOX_CHECK_GE(numMissedInputRows, -1); + if (numMissedInputRows > 0) { + if (totalMissedInputRows == 0) { + bits::fillBits( + rawLookupOutputNulls_, 0, outputBatchSize_, bits::kNotNull); + } + const auto numOutputMissedInputRows = std::min( + numMissedInputRows, outputBatchSize_ - numOutputRows); + bits::fillBits( + rawLookupOutputNulls_, + numOutputRows, + numOutputMissedInputRows, + bits::kNull); + for (auto i = 0; i < numOutputMissedInputRows; ++i) { + rawProbeOutputRowIndices_[numOutputRows++] = ++lastProcessedInputRow; + } + totalMissedInputRows += numOutputMissedInputRows; + continue; + } + + rawProbeOutputRowIndices_[numOutputRows] = + rawLookupInputHitIndices_[nextOutputResultRow_]; + rawLookupOutputRowIndices_[numOutputRows] = nextOutputResultRow_; + lastProcessedInputRow = rawLookupInputHitIndices_[nextOutputResultRow_]; + ++nextOutputResultRow_; + ++numOutputRows; + } + VELOX_CHECK( + numOutputRows == outputBatchSize_ || + nextOutputResultRow_ == lookupResult_->size()); + VELOX_CHECK_LE(nextOutputResultRow_, lookupResult_->size()); + lastProcessedInputRow_ = lastProcessedInputRow; + + if (totalMissedInputRows > 0) { + lookupOutputNulls_->setSize(bits::nbytes(numOutputRows)); + } + probeOutputRowMapping_->setSize(numOutputRows * sizeof(vector_size_t)); + lookupOutputRowMapping_->setSize(numOutputRows * sizeof(vector_size_t)); + + if (numOutputRows == 0) { + return nullptr; + } + + prepareOutput(numOutputRows); + for (const auto& projection : probeOutputProjections_) { + output_->childAt(projection.outputChannel) = BaseVector::wrapInDictionary( + nullptr, + probeOutputRowMapping_, + numOutputRows, + input_->childAt(projection.inputChannel)); + } + for (const auto& projection : lookupOutputProjections_) { + output_->childAt(projection.outputChannel) = BaseVector::wrapInDictionary( + totalMissedInputRows > 0 ? lookupOutputNulls_ : nullptr, + lookupOutputRowMapping_, + numOutputRows, + lookupResult_->output->childAt(projection.inputChannel)); + } + return output_; +} + +RowVectorPtr IndexLookupJoin::produceRemainingOutputForLeftJoin() { + VELOX_CHECK_EQ(joinType_, core::JoinType::kLeft); + VELOX_CHECK_NULL(lookupResult_); + VELOX_CHECK(hasRemainingOutputForLeftJoin()); + VELOX_CHECK_NULL(rawLookupInputHitIndices_); + + prepareOutputRowMappings(outputBatchSize_); + VELOX_CHECK_NOT_NULL(rawLookupOutputNulls_); + + size_t lastProcessedInputRow = lastProcessedInputRow_.value_or(-1); + const size_t numOutputRows = std::min( + outputBatchSize_, input_->size() - lastProcessedInputRow - 1); + VELOX_CHECK_GT(numOutputRows, 0); + bits::fillBits(rawLookupOutputNulls_, 0, numOutputRows, bits::kNull); + for (auto outputRow = 0; outputRow < numOutputRows; ++outputRow) { + rawProbeOutputRowIndices_[outputRow] = ++lastProcessedInputRow; + } + lookupOutputNulls_->setSize(bits::nbytes(numOutputRows)); + probeOutputRowMapping_->setSize(numOutputRows * sizeof(vector_size_t)); + lookupOutputRowMapping_->setSize(numOutputRows * sizeof(vector_size_t)); + + prepareOutput(numOutputRows); + for (const auto& projection : probeOutputProjections_) { + output_->childAt(projection.outputChannel) = BaseVector::wrapInDictionary( + nullptr, + probeOutputRowMapping_, + numOutputRows, + input_->childAt(projection.inputChannel)); + } + for (const auto& projection : lookupOutputProjections_) { + output_->childAt(projection.outputChannel) = BaseVector::wrapInDictionary( + lookupOutputNulls_, + lookupOutputRowMapping_, + numOutputRows, + BaseVector::createNullConstant( + output_->type()->childAt(projection.outputChannel), + numOutputRows, + pool())); + } + lastProcessedInputRow_ = lastProcessedInputRow; + return output_; +} + +void IndexLookupJoin::prepareOutputRowMappings(size_t outputBatchSize) { + VELOX_CHECK_EQ(joinType_, core::JoinType::kLeft); + + const auto mappingByteSize = outputBatchSize * sizeof(vector_size_t); + if ((probeOutputRowMapping_ == nullptr) || + !probeOutputRowMapping_->unique() || + (probeOutputRowMapping_->capacity() < mappingByteSize)) { + probeOutputRowMapping_ = allocateIndices(outputBatchSize, pool()); + } else { + probeOutputRowMapping_->setSize(outputBatchSize); + } + rawProbeOutputRowIndices_ = + probeOutputRowMapping_->asMutable(); + + if ((lookupOutputRowMapping_ == nullptr) || + !lookupOutputRowMapping_->unique() || + (lookupOutputRowMapping_->capacity() < mappingByteSize)) { + lookupOutputRowMapping_ = allocateIndices(outputBatchSize, pool()); + } else { + lookupOutputRowMapping_->setSize(outputBatchSize); + } + rawLookupOutputRowIndices_ = + lookupOutputRowMapping_->asMutable(); + + const auto nullByteSize = bits::nbytes(outputBatchSize); + if (lookupOutputNulls_ == nullptr || !lookupOutputNulls_->unique() || + (lookupOutputNulls_->capacity() < nullByteSize)) { + lookupOutputNulls_ = allocateNulls(outputBatchSize, pool()); + } + rawLookupOutputNulls_ = lookupOutputNulls_->asMutable(); +} + +void IndexLookupJoin::close() { + // TODO: add close method for index source if needed to free up resource + // or shutdown index source gracefully. + indexSource_.reset(); + lookupResultIter_ = nullptr; + lookupInput_ = nullptr; + lookupResult_ = nullptr; + probeOutputRowMapping_ = nullptr; + lookupOutputRowMapping_ = nullptr; + lookupOutputNulls_ = nullptr; + + Operator::close(); +} +} // namespace facebook::velox::exec diff --git a/velox/exec/IndexLookupJoin.h b/velox/exec/IndexLookupJoin.h new file mode 100644 index 000000000000..69644c0141c4 --- /dev/null +++ b/velox/exec/IndexLookupJoin.h @@ -0,0 +1,161 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once +#include "velox/exec/Operator.h" + +namespace facebook::velox::exec { + +class IndexLookupJoin : public Operator { + public: + IndexLookupJoin( + int32_t operatorId, + DriverCtx* driverCtx, + const std::shared_ptr& joinNode); + + void initialize() override; + + BlockingReason isBlocked(ContinueFuture* future) override; + + bool needsInput() const override { + if (noMoreInput_ || input_ != nullptr) { + return false; + } + return true; + } + + void addInput(RowVectorPtr input) override; + + RowVectorPtr getOutput() override; + + bool isFinished() override { + return noMoreInput_ && input_ == nullptr; + } + + void close() override; + + private: + // Initialize the lookup input and output type, and the output projections. + void initLookupInput(); + void initLookupOutput(); + void initOutputProjections(); + // Prepare lookup input for index source lookup for a given 'input_'. + void prepareLookupInput(); + + void lookup(); + + RowVectorPtr getOutputFromLookupResult(); + RowVectorPtr produceOutputForInnerJoin(); + RowVectorPtr produceOutputForLeftJoin(); + // Produces output for the remaining input rows that has no matches from the + // lookup at the end of current input batch processing. + RowVectorPtr produceRemainingOutputForLeftJoin(); + + // Returns true if we have remaining output rows from the current + // 'lookupResult_' after finishing processing all the output results from the + // current 'lookupResultIter_'. For left join, we need to produce for the all + // the input rows that have no matches in 'lookupResult_'. + bool hasRemainingOutputForLeftJoin() const; + + // Checks if we have finished processing the current 'lookupResult_'. If so, + // we reset 'lookupResult_' and corresponding processing state. + void maybeFinishLookupResult(); + + // Invoked after finished processing the current 'input_' batch. The function + // resets the input batch and the lookup result states. + void finishInput(); + + // Prepare output row mappings for the next output batch with max size of + // 'outputBatchSize'. This is only used by left join which needs to fill nulls + // for output rows without lookup matches. + void prepareOutputRowMappings(size_t outputBatchSize); + // Prepare 'output_' for the next output batch with size of 'numOutputRows'. + void prepareOutput(vector_size_t numOutputRows); + + // Maximum number of rows in the output batch. + const vector_size_t outputBatchSize_; + // Type of join. + const core::JoinType joinType_; + const size_t numKeys_; + const RowTypePtr probeType_; + const RowTypePtr lookupType_; + const std::shared_ptr lookupTableHandle_; + const std::vector lookupConditions_; + std::unordered_map> + lookupColumnHandles_; + const std::shared_ptr connectorQueryCtx_; + core::ExpressionEvaluator* const expressionEvaluator_; + const std::shared_ptr connector_; + + // The lookup join plan node used to initialize this operator and reset after + // that. + std::shared_ptr joinNode_; + + // The data type of the lookup input including probe side columns either used + // in equi-clauses or join conditions. + RowTypePtr lookupInputType_; + // The column channels in probe 'input_' referenced by 'lookupInputType_'. + std::vector lookupInputChannels_; + // The reused row vector for lookup input. + RowVectorPtr lookupInput_; + + // The data type of the lookup output from the lookup source. + RowTypePtr lookupOutputType_; + + // Used to project output columns from the probe input and lookup output. + std::vector probeOutputProjections_; + std::vector lookupOutputProjections_; + + std::shared_ptr indexSource_; + + // Used for synchronization with the async fetch result from index source + // through 'lookupResultIter_'. + ContinueFuture lookupFuture_{ContinueFuture::makeEmpty()}; + // Used to fetch lookup results for each input batch, and reset after + // processing all the outputs from the result. + std::unique_ptr + lookupResultIter_; + // Used to store the lookup result fetched from 'lookupResultIter_' for output + // processing. We might split the output result into multiple output batches + // based on the operator's output batch size limit. + std::unique_ptr lookupResult_; + + // Points to the next output row in 'lookupResult_' for processing until + // reaches to the end of 'lookupResult_'. + vector_size_t nextOutputResultRow_{0}; + + // Points to the input row in 'input_' that has matched in 'lookupResult_'. + // The gap between consecutive input row indices indicates the number of input + // rows that has no matches in 'lookupResult_'. The left join needs to fill + // the lookup output with nulls for these input rows. + const vector_size_t* rawLookupInputHitIndices_{nullptr}; + // This is set for left join to detect missed input rows. + // If not null, it points to the last processed input row in 'input_'. It is + // used with 'rawLookupInputHitIndices_' to detect input rows that has no + // match in 'lookupResult_'. + std::optional lastProcessedInputRow_; + + // Reusable buffers used by left join for output row mappings. + BufferPtr probeOutputRowMapping_; + vector_size_t* rawProbeOutputRowIndices_{nullptr}; + BufferPtr lookupOutputRowMapping_; + vector_size_t* rawLookupOutputRowIndices_{nullptr}; + BufferPtr lookupOutputNulls_; + uint64_t* rawLookupOutputNulls_{nullptr}; + + // The reusable output vector for the join output. + RowVectorPtr output_; +}; +} // namespace facebook::velox::exec diff --git a/velox/exec/LocalPlanner.cpp b/velox/exec/LocalPlanner.cpp index 96911dc5d0d9..92288dec7853 100644 --- a/velox/exec/LocalPlanner.cpp +++ b/velox/exec/LocalPlanner.cpp @@ -26,6 +26,7 @@ #include "velox/exec/HashAggregation.h" #include "velox/exec/HashBuild.h" #include "velox/exec/HashProbe.h" +#include "velox/exec/IndexLookupJoin.h" #include "velox/exec/Limit.h" #include "velox/exec/MarkDistinct.h" #include "velox/exec/Merge.h" @@ -531,6 +532,12 @@ std::shared_ptr DriverFactory::createDriver( planNode)) { operators.push_back( std::make_unique(id, ctx.get(), joinNode)); + } else if ( + auto joinNode = + std::dynamic_pointer_cast( + planNode)) { + operators.push_back( + std::make_unique(id, ctx.get(), joinNode)); } else if ( auto aggregationNode = std::dynamic_pointer_cast(planNode)) { diff --git a/velox/exec/tests/IndexLookupJoinTest.cpp b/velox/exec/tests/IndexLookupJoinTest.cpp index 272ed92f778e..655cca0cca58 100644 --- a/velox/exec/tests/IndexLookupJoinTest.cpp +++ b/velox/exec/tests/IndexLookupJoinTest.cpp @@ -17,16 +17,20 @@ #include "velox/common/base/tests/GTestUtils.h" #include "velox/connectors/Connector.h" #include "velox/core/PlanNode.h" +#include "velox/exec/tests/utils/AssertQueryBuilder.h" #include "velox/exec/tests/utils/HiveConnectorTestBase.h" #include "velox/exec/tests/utils/PlanBuilder.h" +#include "velox/exec/tests/utils/TestIndexStorageConnector.h" using namespace facebook::velox; using namespace facebook::velox::exec; using namespace facebook::velox::exec::test; using namespace facebook::velox::common::testutil; +namespace fecebook::velox::exec::test { namespace { -class IndexLookupJoinTest : public HiveConnectorTestBase { +class IndexLookupJoinTest : public HiveConnectorTestBase, + public testing::WithParamInterface { protected: IndexLookupJoinTest() = default; @@ -36,9 +40,30 @@ class IndexLookupJoinTest : public HiveConnectorTestBase { connector::hive::HiveColumnHandle::registerSerDe(); Type::registerSerDe(); core::ITypedExpr::registerSerDe(); + connector::registerConnectorFactory( + std::make_shared()); + std::shared_ptr connector = + connector::getConnectorFactory(kTestIndexConnectorName) + ->newConnector( + kTestIndexConnectorName, + {}, + nullptr, + connectorCpuExecutor_.get()); + connector::registerConnector(connector); + + // TODO: extend to support multiple key columns. + keyType_ = ROW({"u0"}, {BIGINT()}); + valueType_ = ROW({"u1", "u2", "u3"}, {BIGINT(), BIGINT(), VARCHAR()}); + tableType_ = ROW( + {"u0", "u1", "u2", "u3"}, {BIGINT(), BIGINT(), BIGINT(), VARCHAR()}); + probeType_ = ROW( + {"t0", "t1", "t2", "t3"}, {BIGINT(), BIGINT(), VARCHAR(), BIGINT()}); + + TestIndexTableHandle::registerSerDe(); } void TearDown() override { + connector::unregisterConnectorFactory(kTestIndexConnectorName); HiveConnectorTestBase::TearDown(); } @@ -49,53 +74,252 @@ class IndexLookupJoinTest : public HiveConnectorTestBase { ASSERT_EQ(plan->toString(true, true), copy->toString(true, true)); } -}; -class IndexTableHandle : public connector::ConnectorTableHandle { - public: - explicit IndexTableHandle(std::string connectorId) - : ConnectorTableHandle(std::move(connectorId)) {} + // Generate index lookup table data. + // @param numKeys: number of unique keys. + // @param numDuplicatePerKey: number of duplicate rows per unique key so the + // total number of rows in the table is numKeys * + // numDuplicatePerKey. + // @param keyData: output key column vectors. This is used to populate the + // hash table in the index table. + // @param valueData: output value column vectors. This is used to populate the + // hash table in the index table. + // @param tableData: output table column vectors including key and value. This + // is used to populate duckdb table. + void generateIndexTableData( + size_t numKeys, + size_t numDuplicatePerKey, + RowVectorPtr& keyData, + RowVectorPtr& valueData, + RowVectorPtr& tableData) { + const int numRows = numKeys * numDuplicatePerKey; + VectorFuzzer::Options opts; + opts.vectorSize = numRows; + opts.nullRatio = 0.0; + VectorFuzzer fuzzer(opts, pool_.get()); + + keyData = fuzzer.fuzzInputFlatRow(keyType_); + valueData = fuzzer.fuzzInputFlatRow(valueType_); + keyData->childAt(0) = makeFlatVector( + keyData->size(), + [numDuplicatePerKey](auto row) { return row / numDuplicatePerKey; }); + std::vector tableColumns; + VELOX_CHECK_EQ(tableType_->size(), keyType_->size() + valueType_->size()); + tableColumns.reserve(tableType_->size()); + for (auto i = 0; i < keyType_->size(); ++i) { + tableColumns.push_back(keyData->childAt(i)); + } + for (auto i = 0; i < valueType_->size(); ++i) { + tableColumns.push_back(valueData->childAt(i)); + } + tableData = makeRowVector(tableType_->names(), tableColumns); + } + + // Generate probe input for lookup join. + // @param numBatches: number of probe batches. + // @param batchSize: number of rows in each probe batch. + // @param tableKey: key column vectors of the index table. This is used to set + // the key range for the probe input with specified lookup + // match percentage for convenience. + // @param matchPct: percentage of rows in the probe input that matches with + // the rows in index table. + std::vector generateProbeTableInput( + size_t numBatches, + size_t batchSize, + const RowVectorPtr& tableKey, + size_t matchPct) { + VELOX_CHECK_LE(matchPct, 100); + std::vector probeInputs; + probeInputs.reserve(numBatches); + VectorFuzzer::Options opts; + opts.vectorSize = batchSize; + // TODO: add nullable handling later. + opts.nullRatio = 0.0; + VectorFuzzer fuzzer(opts, pool_.get()); + for (int i = 0; i < numBatches; ++i) { + probeInputs.push_back(fuzzer.fuzzInputRow(probeType_)); + } - ~IndexTableHandle() override = default; + if (tableKey->size() == 0) { + return probeInputs; + } - std::string toString() const override { - static const std::string str{"IndexTableHandle"}; - return str; + // Set the key range for the probe input either within or outside the table + // key range based on the specified match percentage. + auto* flatKeyVector = tableKey->childAt(0)->asFlatVector(); + const auto minKey = flatKeyVector->valueAt(0); + const auto maxKey = flatKeyVector->valueAt(flatKeyVector->size() - 1); + for (int i = 0, row = 0; i < numBatches; ++i) { + probeInputs[i]->childAt(0)->loadedVector(); + BaseVector::flattenVector(probeInputs[i]->childAt(0)); + auto* flatProbeKeyVector = + probeInputs[i]->childAt(0)->asFlatVector(); + VELOX_CHECK_NOT_NULL(flatProbeKeyVector); + for (int j = 0; j < flatProbeKeyVector->size(); ++j, ++row) { + if (row % 100 < matchPct) { + flatProbeKeyVector->set(j, folly::Random::rand64(minKey, maxKey + 1)); + } else { + flatProbeKeyVector->set(j, maxKey + 1 + folly::Random::rand32()); + } + } + } + return probeInputs; } - const std::string& name() const override { - static const std::string connectorName{"IndexTableHandle"}; - return connectorName; + // Create index table with the given key and value inputs. + std::shared_ptr createIndexTable( + const RowVectorPtr& keyData, + const RowVectorPtr& valueData) { + auto keyType = std::dynamic_pointer_cast(keyData->type()); + VELOX_CHECK_GE(keyType->size(), 1); + auto valueType = + std::dynamic_pointer_cast(valueData->type()); + VELOX_CHECK_GE(valueType->size(), 1); + const auto numRows = keyData->size(); + VELOX_CHECK_EQ(numRows, valueData->size()); + + std::vector> hashers; + hashers.reserve(keyType->size()); + for (auto i = 0; i < keyType->size(); ++i) { + hashers.push_back(std::make_unique(keyType->childAt(i), i)); + } + + // Create the table. + auto table = HashTable::createForJoin( + std::move(hashers), + /*dependentTypes=*/valueType->children(), + /*allowDuplicates=*/true, + /*hasProbedFlag=*/false, + /*minTableSizeForParallelJoinBuild=*/1, + pool_.get()); + + // Insert data into the row container. + auto rowContainer = table->rows(); + std::vector decodedVectors; + for (auto& vector : keyData->children()) { + decodedVectors.emplace_back(*vector); + } + for (auto& vector : valueData->children()) { + decodedVectors.emplace_back(*vector); + } + + std::vector rows; + for (auto row = 0; row < numRows; ++row) { + auto* newRow = rowContainer->newRow(); + + for (auto col = 0; col < decodedVectors.size(); ++col) { + rowContainer->store(decodedVectors[col], row, newRow, col); + } + } + + // Build the table index. + table->prepareJoinTable({}, BaseHashTable::kNoSpillInputStartPartitionBit); + return std::make_shared( + std::move(keyType), std::move(valueType), std::move(table)); } - bool supportsIndexLookup() const override { - return true; + void createDuckDbTable( + const std::string& tableName, + const std::vector& data) { + // Change each column with prefix 'c' to simplify the duckdb table column + // naming. + std::vector columnNames; + columnNames.reserve(data[0]->type()->size()); + for (int i = 0; i < data[0]->type()->size(); ++i) { + columnNames.push_back(fmt::format("c{}", i)); + } + std::vector duckDbInputs; + duckDbInputs.reserve(data.size()); + for (const auto& dataVector : data) { + duckDbInputs.emplace_back( + makeRowVector(columnNames, dataVector->children())); + } + duckDbQueryRunner_.createTable(tableName, duckDbInputs); } - folly::dynamic serialize() const override { - folly::dynamic obj = folly::dynamic::object; - obj["name"] = name(); - obj["connectorId"] = connectorId(); - return obj; + // Makes index table handle with the specified index table and async lookup + // flag. + std::shared_ptr makeIndexTableHandle( + const std::shared_ptr& indexTable, + bool asyncLookup) { + return std::make_shared( + kTestIndexConnectorName, indexTable, asyncLookup); } - static std::shared_ptr create( - const folly::dynamic& obj, - void* context) { - return std::make_shared(obj["connectorId"].getString()); + // Makes index table scan node with the specified index table handle. + // @param outputType: the output schema of the index table scan node. + // @param scanNodeId: returns the plan node id of the index table scan node. + core::TableScanNodePtr makeIndexScanNode( + const std::shared_ptr& planNodeIdGenerator, + const std::shared_ptr indexTableHandle, + const RowTypePtr& outputType, + core::PlanNodeId& scanNodeId) { + auto planBuilder = PlanBuilder(planNodeIdGenerator); + auto indexTableScan = std::dynamic_pointer_cast( + PlanBuilder::TableScanBuilder(planBuilder) + .tableHandle(indexTableHandle) + .outputType(outputType) + .endTableScan() + .capturePlanNodeId(scanNodeId) + .planNode()); + VELOX_CHECK_NOT_NULL(indexTableScan); + return indexTableScan; } - static void registerSerDe() { - auto& registry = DeserializationWithContextRegistryForSharedPtr(); - registry.Register("IndexTableHandle", create); + // Makes output schema from the index table scan node with the specified + // column names. + RowTypePtr makeScanOutputType(std::vector outputNames) { + std::vector types; + for (int i = 0; i < outputNames.size(); ++i) { + if (valueType_->getChildIdxIfExists(outputNames[i]).has_value()) { + types.push_back(valueType_->findChild(outputNames[i])); + continue; + } + types.push_back(keyType_->findChild(outputNames[i])); + } + return ROW(std::move(outputNames), std::move(types)); } + + // Makes lookup join plan with the following parameters: + // @param indexScanNode: the index table scan node. + // @param probeVectors: the probe input vectors. + // @param outputColumns: the output column names of index lookup join. + // @param joinType: the join type of index lookup join. + // @param joinNodeId: returns the plan node id of the index lookup join node. + core::PlanNodePtr makeLookupPlan( + const std::shared_ptr& planNodeIdGenerator, + core::TableScanNodePtr indexScanNode, + const std::vector probeVectors, + const std::vector& outputColumns, + core::JoinType joinType, + core::PlanNodeId& joinNodeId) { + VELOX_CHECK_EQ(keyType_->size(), 1); + return PlanBuilder(planNodeIdGenerator) + .values(probeVectors) + .indexLookupJoin( + {{probeType_->nameOf(0)}}, + {"u0"}, + indexScanNode, + {}, + outputColumns, + joinType) + .capturePlanNodeId(joinNodeId) + .planNode(); + } + + const std::unique_ptr connectorCpuExecutor_{ + std::make_unique(128)}; + RowTypePtr keyType_; + RowTypePtr valueType_; + RowTypePtr tableType_; + RowTypePtr probeType_; }; -TEST_F(IndexLookupJoinTest, planNodeAndSerde) { - IndexTableHandle::registerSerDe(); +TEST_P(IndexLookupJoinTest, planNodeAndSerde) { + TestIndexTableHandle::registerSerDe(); - auto indexConnectorHandle = - std::make_shared("IndexConnector"); + auto indexConnectorHandle = std::make_shared( + kTestIndexConnectorName, nullptr, true); auto left = makeRowVector( {"t0", "t1", "t2"}, @@ -111,16 +335,16 @@ TEST_F(IndexLookupJoinTest, planNodeAndSerde) { auto planNodeIdGenerator = std::make_shared(); - auto planBuilder = exec::test::PlanBuilder(); + auto planBuilder = PlanBuilder(); auto nonIndexTableScan = std::dynamic_pointer_cast( - exec::test::PlanBuilder::TableScanBuilder(planBuilder) + PlanBuilder::TableScanBuilder(planBuilder) .outputType(std::dynamic_pointer_cast(right->type())) .endTableScan() .planNode()); VELOX_CHECK_NOT_NULL(nonIndexTableScan); auto indexTableScan = std::dynamic_pointer_cast( - exec::test::PlanBuilder::TableScanBuilder(planBuilder) + PlanBuilder::TableScanBuilder(planBuilder) .tableHandle(indexConnectorHandle) .outputType(std::dynamic_pointer_cast(right->type())) .endTableScan() @@ -143,7 +367,7 @@ TEST_F(IndexLookupJoinTest, planNodeAndSerde) { ASSERT_TRUE(indexLookupJoinNode->joinConditions().empty()); ASSERT_EQ( indexLookupJoinNode->lookupSource()->tableHandle()->connectorId(), - "IndexConnector"); + kTestIndexConnectorName); testSerde(plan); } @@ -164,7 +388,7 @@ TEST_F(IndexLookupJoinTest, planNodeAndSerde) { ASSERT_EQ(indexLookupJoinNode->joinConditions().size(), 1); ASSERT_EQ( indexLookupJoinNode->lookupSource()->tableHandle()->connectorId(), - "IndexConnector"); + kTestIndexConnectorName); testSerde(plan); } @@ -221,4 +445,406 @@ TEST_F(IndexLookupJoinTest, planNodeAndSerde) { "JoinNode requires at least one join key"); } } + +TEST_P(IndexLookupJoinTest, basic) { + struct { + int numKeys; + int numDuplicatePerKey; + int numBatches; + int numProbeRowsPerBatch; + int matchPct; + std::vector scanOutputColumns; + std::vector outputColumns; + core::JoinType joinType; + std::string duckDbVerifySql; + + std::string debugString() const { + return fmt::format( + "numKeys: {}, numDuplicatePerKey: {}, numBatches: {}, numProbeRowsPerBatch: {}, matchPct: {}, scanOutputColumns: {}, outputColumns: {}, joinType: {}, duckDbVerifySql: {}", + numKeys, + numDuplicatePerKey, + numBatches, + numProbeRowsPerBatch, + matchPct, + folly::join(",", scanOutputColumns), + folly::join(",", outputColumns), + core::joinTypeName(joinType), + duckDbVerifySql); + } + } testSettings[] = { + // Inner join. + // 10% match. + {100, + 1, + 10, + 100, + 10, + {"u0", "u1", "u2", "u3"}, + {"t1", "u1", "u2", "u3"}, + core::JoinType::kInner, + "SELECT t.c1, u.c1, u.c2, u.c3 FROM t, u WHERE t.c0 = u.c0"}, + // 10% match with duplicates. + {100, + 4, + 10, + 100, + 10, + {"u0", "u1", "u2", "u3"}, + {"t1", "u1", "u2", "u3"}, + core::JoinType::kInner, + "SELECT t.c1, u.c1, u.c2, u.c3 FROM t, u WHERE t.c0 = u.c0"}, + // 10% match with larger lookup table. + {500, + 1, + 10, + 100, + 10, + {"u0", "u1", "u2", "u3"}, + {"t1", "u1", "u2", "u3"}, + core::JoinType::kInner, + "SELECT t.c1, u.c1, u.c2, u.c3 FROM t, u WHERE t.c0 = u.c0"}, + // 10% match + duplicate with larger lookup table. + {500, + 4, + 10, + 100, + 10, + {"u0", "u1", "u2", "u3"}, + {"t1", "u1", "u2", "u3"}, + core::JoinType::kInner, + "SELECT t.c1, u.c1, u.c2, u.c3 FROM t, u WHERE t.c0 = u.c0"}, + // Empty lookup table. + {0, + 1, + 10, + 100, + 10, + {"u0", "u1", "u2", "u3"}, + {"t1", "u1", "u2", "u3"}, + core::JoinType::kInner, + "SELECT t.c1, u.c1, u.c2, u.c3 FROM t, u WHERE t.c0 = u.c0"}, + // No match. + {500, + 4, + 10, + 100, + 0, + {"u0", "u1", "u2", "u3"}, + {"t1", "u1", "u2", "u3"}, + core::JoinType::kInner, + "SELECT t.c1, u.c1, u.c2, u.c3 FROM t, u WHERE t.c0 = u.c0"}, + // 10% match with larger lookup table. + {500, + 1, + 10, + 100, + 10, + {"u0", "u1", "u2", "u3"}, + {"t1", "u1", "u2", "u3"}, + core::JoinType::kInner, + "SELECT t.c1, u.c1, u.c2, u.c3 FROM t, u WHERE t.c0 = u.c0"}, + // very few (2%) match with larger lookup table. + {500, + 1, + 10, + 100, + 2, + {"u0", "u1", "u2", "u3"}, + {"t1", "u1", "u2", "u3"}, + core::JoinType::kInner, + "SELECT t.c1, u.c1, u.c2, u.c3 FROM t, u WHERE t.c0 = u.c0"}, + // very few (2%) match + duplicate with larger lookup table. + {500, + 4, + 10, + 100, + 2, + {"u0", "u1", "u2", "u3"}, + {"t1", "u1", "u2", "u3"}, + core::JoinType::kInner, + "SELECT t.c1, u.c1, u.c2, u.c3 FROM t, u WHERE t.c0 = u.c0"}, + // All matches with larger lookup table. + {500, + 1, + 10, + 100, + 2, + {"u0", "u1", "u2", "u3"}, + {"t1", "u1", "u2", "u3"}, + core::JoinType::kInner, + "SELECT t.c1, u.c1, u.c2, u.c3 FROM t, u WHERE t.c0 = u.c0"}, + // All matches + duplicate with larger lookup table. + {500, + 4, + 10, + 100, + 2, + {"u0", "u1", "u2", "u3"}, + {"t1", "u1", "u2", "u3"}, + core::JoinType::kInner, + "SELECT t.c1, u.c1, u.c2, u.c3 FROM t, u WHERE t.c0 = u.c0"}, + // No probe projection. + {500, + 1, + 10, + 100, + 2, + {"u0", "u1", "u2", "u3"}, + {"u1", "u2", "u3"}, + core::JoinType::kInner, + "SELECT u.c1, u.c2, u.c3 FROM t, u WHERE t.c0 = u.c0"}, + // No probe projection + duplicate with larger lookup table. + {500, + 4, + 10, + 100, + 2, + {"u0", "u1", "u2", "u3"}, + {"u1", "u2", "u3"}, + core::JoinType::kInner, + "SELECT u.c1, u.c2, u.c3 FROM t, u WHERE t.c0 = u.c0"}, + // Probe column reorder in output. + {500, + 4, + 10, + 100, + 2, + {"u0", "u1", "u2", "u3"}, + {"t2", "t1", "u1", "u2", "u3"}, + core::JoinType::kInner, + "SELECT t.c2, t.c1, u.c1, u.c2, u.c3 FROM t, u WHERE t.c0 = u.c0"}, + // Lookup column reorder in output. + {500, + 4, + 10, + 100, + 2, + {"u1", "u0", "u2"}, + {"t1", "u2", "u1", "t2"}, + core::JoinType::kInner, + "SELECT t.c1, u.c2, u.c1, t.c2 FROM t, u WHERE t.c0 = u.c0"}, + // Both sides reorder in output. + {500, + 4, + 10, + 100, + 2, + {"u1", "u0", "u2", "u3"}, + {"t2", "u2", "u3", "t1", "u1"}, + core::JoinType::kInner, + "SELECT t.c2, u.c2, u.c3, t.c1, u.c1 FROM t, u WHERE t.c0 = u.c0"}, + // With probe key colums. + {500, + 4, + 10, + 100, + 2, + {"u1", "u0", "u2", "u3"}, + {"t2", "u2", "u3", "t1", "u1", "t0"}, + core::JoinType::kInner, + "SELECT t.c2, u.c2, u.c3, t.c1, u.c1, t.c0 FROM t, u WHERE t.c0 = u.c0"}, + + // Left join. + // 10% match. + {100, + 1, + 10, + 100, + 10, + {"u0", "u1", "u2", "u3"}, + {"t1", "u1", "u2", "u3"}, + core::JoinType::kLeft, + "SELECT t.c1, u.c1, u.c2, u.c3 FROM t LEFT JOIN u ON t.c0 = u.c0"}, + // 10% match with duplicates. + {100, + 4, + 10, + 100, + 10, + {"u0", "u1", "u2", "u3"}, + {"t1", "u1", "u2", "u3"}, + core::JoinType::kLeft, + "SELECT t.c1, u.c1, u.c2, u.c3 FROM t LEFT JOIN u ON t.c0 = u.c0"}, + // 10% match with larger lookup table. + {500, + 1, + 10, + 100, + 10, + {"u0", "u1", "u2", "u3"}, + {"t1", "u1", "u2", "u3"}, + core::JoinType::kLeft, + "SELECT t.c1, u.c1, u.c2, u.c3 FROM t LEFT JOIN u ON t.c0 = u.c0"}, + // 10% match + duplicate with larger lookup table. + {500, + 4, + 10, + 100, + 10, + {"u0", "u1", "u2", "u3"}, + {"t1", "u1", "u2", "u3"}, + core::JoinType::kLeft, + "SELECT t.c1, u.c1, u.c2, u.c3 FROM t LEFT JOIN u ON t.c0 = u.c0"}, + + // Empty lookup table. + {0, + 1, + 10, + 100, + 10, + {"u0", "u1", "u2", "u3"}, + {"t1", "u1", "u2", "u3"}, + core::JoinType::kLeft, + "SELECT t.c1, u.c1, u.c2, u.c3 FROM t LEFT JOIN u ON t.c0 = u.c0"}, + // No match. + {500, + 4, + 10, + 100, + 0, + {"u0", "u1", "u2", "u3"}, + {"t1", "u1", "u2", "u3"}, + core::JoinType::kLeft, + "SELECT t.c1, u.c1, u.c2, u.c3 FROM t LEFT JOIN u ON t.c0 = u.c0"}, + // 10% match with larger lookup table. + {500, + 1, + 10, + 100, + 10, + {"u0", "u1", "u2", "u3"}, + {"t1", "u1", "u2", "u3"}, + core::JoinType::kLeft, + "SELECT t.c1, u.c1, u.c2, u.c3 FROM t LEFT JOIN u ON t.c0 = u.c0"}, + // very few (2%) match with larger lookup table. + {500, + 1, + 10, + 100, + 2, + {"u0", "u1", "u2", "u3"}, + {"t1", "u1", "u2", "u3"}, + core::JoinType::kLeft, + "SELECT t.c1, u.c1, u.c2, u.c3 FROM t LEFT JOIN u ON t.c0 = u.c0"}, + // very few (2%) match + duplicate with larger lookup table. + {500, + 4, + 10, + 100, + 2, + {"u0", "u1", "u2", "u3"}, + {"t1", "u1", "u2", "u3"}, + core::JoinType::kLeft, + "SELECT t.c1, u.c1, u.c2, u.c3 FROM t LEFT JOIN u ON t.c0 = u.c0"}, + // All matches with larger lookup table. + {500, + 1, + 10, + 100, + 2, + {"u0", "u1", "u2", "u3"}, + {"t1", "u1", "u2", "u3"}, + core::JoinType::kLeft, + "SELECT t.c1, u.c1, u.c2, u.c3 FROM t LEFT JOIN u ON t.c0 = u.c0"}, + // All matches + duplicate with larger lookup table. + {500, + 4, + 10, + 100, + 2, + {"u0", "u1", "u2", "u3"}, + {"t1", "u1", "u2", "u3"}, + core::JoinType::kLeft, + "SELECT t.c1, u.c1, u.c2, u.c3 FROM t LEFT JOIN u ON t.c0 = u.c0"}, + // Probe column reorder in output. + {500, + 4, + 10, + 100, + 2, + {"u0", "u1", "u2", "u3"}, + {"t2", "t1", "u1", "u2", "u3"}, + core::JoinType::kLeft, + "SELECT t.c2, t.c1, u.c1, u.c2, u.c3 FROM t LEFT JOIN u ON t.c0 = u.c0"}, + // Lookup column reorder in output. + {500, + 4, + 10, + 100, + 2, + {"u1", "u0", "u2"}, + {"t1", "u2", "u1", "t2"}, + core::JoinType::kLeft, + "SELECT t.c1, u.c2, u.c1, t.c2 FROM t LEFT JOIN u ON t.c0 = u.c0"}, + // Both sides reorder in output. + {500, + 4, + 10, + 100, + 2, + {"u1", "u0", "u2", "u3"}, + {"t2", "u2", "u3", "t1", "u1"}, + core::JoinType::kLeft, + "SELECT t.c2, u.c2, u.c3, t.c1, u.c1 FROM t LEFT JOIN u ON t.c0 = u.c0"}, + // With probe key colums. + {500, + 4, + 10, + 100, + 2, + {"u1", "u0", "u2", "u3"}, + {"t2", "u2", "u3", "t1", "u1", "t0"}, + core::JoinType::kLeft, + "SELECT t.c2, u.c2, u.c3, t.c1, u.c1, t.c0 FROM t LEFT JOIN u ON t.c0 = u.c0"}, + }; + for (const auto& testData : testSettings) { + SCOPED_TRACE(testData.debugString()); + + RowVectorPtr keyData; + RowVectorPtr valueData; + RowVectorPtr tableData; + generateIndexTableData( + testData.numKeys, + testData.numDuplicatePerKey, + keyData, + valueData, + tableData); + const std::vector probeVectors = generateProbeTableInput( + testData.numBatches, + testData.numProbeRowsPerBatch, + keyData, + testData.matchPct); + + createDuckDbTable("t", probeVectors); + createDuckDbTable("u", {tableData}); + + const auto indexTable = createIndexTable(keyData, valueData); + const auto indexTableHandle = makeIndexTableHandle(indexTable, GetParam()); + auto planNodeIdGenerator = std::make_shared(); + core::PlanNodeId indexScanNodeId; + const auto indexScanNode = makeIndexScanNode( + planNodeIdGenerator, + indexTableHandle, + makeScanOutputType(testData.scanOutputColumns), + indexScanNodeId); + + core::PlanNodeId joinNodeId; + auto plan = makeLookupPlan( + planNodeIdGenerator, + indexScanNode, + probeVectors, + testData.outputColumns, + testData.joinType, + joinNodeId); + AssertQueryBuilder(duckDbQueryRunner_) + .plan(plan) + .assertResults(testData.duckDbVerifySql); + } +} } // namespace + +VELOX_INSTANTIATE_TEST_SUITE_P( + IndexLookupJoinTest, + IndexLookupJoinTest, + testing::ValuesIn({false, true})); +} // namespace fecebook::velox::exec::test