Skip to content

Commit

Permalink
Provide PlanBuilder convenience function
Browse files Browse the repository at this point in the history
Summary:
Add a PlanBuilder.tableScane helper function to reduce the amount of
boilerplace when pulling data out of TPC-H connector.

Differential Revision: D36303299

fbshipit-source-id: 9ac4226185a99a0a53fbea9abfca7f48451ad941
  • Loading branch information
pedroerp authored and facebook-github-bot committed May 11, 2022
1 parent 43308b0 commit 80a4e00
Show file tree
Hide file tree
Showing 6 changed files with 63 additions and 33 deletions.
50 changes: 17 additions & 33 deletions velox/connectors/tpch/tests/TpchConnectorTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,31 +50,15 @@ class TpchConnectorTest : public exec::test::OperatorTestBase {
return exec::Split(std::make_shared<TpchConnectorSplit>(kTpchConnectorId));
}

// Helper function to create 1:1 assignments maps based on output type.
auto defaultAssignments(const RowTypePtr& outputType) const {
std::unordered_map<std::string, std::shared_ptr<connector::ColumnHandle>>
assignmentsMap;

for (const auto& columnName : outputType->names()) {
assignmentsMap.emplace(
columnName, std::make_shared<TpchColumnHandle>(columnName));
}
return assignmentsMap;
}

void runScaleFactorTest(size_t scaleFactor);
};

// Simple scan of first 5 rows of "nation".
TEST_F(TpchConnectorTest, simple) {
auto outputType =
ROW({"n_nationkey", "n_name", "n_regionkey", "n_comment"},
{BIGINT(), VARCHAR(), BIGINT(), VARCHAR()});
auto plan = PlanBuilder()
.tableScan(
outputType,
std::make_shared<TpchTableHandle>(Table::TBL_NATION),
defaultAssignments(outputType))
{"n_nationkey", "n_name", "n_regionkey", "n_comment"})
.limit(0, 5, false)
.planNode();

Expand Down Expand Up @@ -106,13 +90,11 @@ TEST_F(TpchConnectorTest, simple) {

// Extract single column from "nation".
TEST_F(TpchConnectorTest, singleColumn) {
auto outputType = ROW({"n_name"}, {VARCHAR()});
auto plan = PlanBuilder()
.tableScan(
outputType,
std::make_shared<TpchTableHandle>(Table::TBL_NATION),
defaultAssignments(outputType))
.planNode();
auto plan =
PlanBuilder()
.tableScan(
std::make_shared<TpchTableHandle>(Table::TBL_NATION), {"n_name"})
.planNode();

auto output = getResults(plan, {makeTpchSplit()});
auto expected = makeRowVector({makeFlatVector<StringView>({
Expand Down Expand Up @@ -183,15 +165,17 @@ TEST_F(TpchConnectorTest, simpleAggregation) {
}

TEST_F(TpchConnectorTest, unknownColumn) {
auto outputType = ROW({"does_not_exist"}, {VARCHAR()});
auto plan = PlanBuilder()
.tableScan(
outputType,
std::make_shared<TpchTableHandle>(Table::TBL_NATION),
defaultAssignments(outputType))
.planNode();

EXPECT_THROW(getResults(plan, {makeTpchSplit()}), VeloxRuntimeError);
EXPECT_THROW(
{
PlanBuilder()
.tableScan(
std::make_shared<TpchTableHandle>(Table::TBL_NATION),
{"does_not_exist"})
.planNode();
},
VeloxUserError);

// EXPECT_THROW(getResults(plan, {makeTpchSplit()}), VeloxUserError);
}

} // namespace
Expand Down
1 change: 1 addition & 0 deletions velox/exec/tests/utils/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -36,5 +36,6 @@ target_link_libraries(
velox_dwio_dwrf_writer
velox_dwrf_test_utils
velox_hive_connector
velox_tpch_connector
velox_presto_serializer
velox_functions_prestosql)
22 changes: 22 additions & 0 deletions velox/exec/tests/utils/PlanBuilder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include "velox/exec/tests/utils/PlanBuilder.h"
#include "velox/common/memory/Memory.h"
#include "velox/connectors/hive/HiveConnector.h"
#include "velox/connectors/tpch/TpchConnector.h"
#include "velox/duckdb/conversion/DuckParser.h"
#include "velox/exec/Aggregate.h"
#include "velox/exec/HashPartitionFunction.h"
Expand Down Expand Up @@ -424,6 +425,27 @@ PlanBuilder& PlanBuilder::tableScan(
return *this;
}

PlanBuilder& PlanBuilder::tableScan(
const std::shared_ptr<connector::tpch::TpchTableHandle>& tpchTableHandle,
std::vector<std::string>&& columnNames) {
std::unordered_map<std::string, std::shared_ptr<connector::ColumnHandle>>
assignmentsMap;
std::vector<TypePtr> outputTypes;

assignmentsMap.reserve(columnNames.size());
outputTypes.reserve(columnNames.size());

for (const auto& columnName : columnNames) {
assignmentsMap.emplace(
columnName,
std::make_shared<connector::tpch::TpchColumnHandle>(columnName));
outputTypes.emplace_back(
resolveTpchColumn(tpchTableHandle->getTable(), columnName));
}
auto rowType = ROW(std::move(columnNames), std::move(outputTypes));
return tableScan(rowType, tpchTableHandle, assignmentsMap);
}

PlanBuilder& PlanBuilder::values(
const std::vector<RowVectorPtr>& values,
bool parallelizable) {
Expand Down
15 changes: 15 additions & 0 deletions velox/exec/tests/utils/PlanBuilder.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@ namespace facebook::velox::core {
class IExpr;
}

namespace facebook::velox::connector::tpch {
class TpchTableHandle;
}

namespace facebook::velox::exec::test {

/// Generates unique sequential plan node IDs starting with zero or specified
Expand Down Expand Up @@ -145,6 +149,7 @@ class PlanBuilder {

/// Add a TableScanNode using a connector-specific table handle and
/// assignments. Supports any connector, not just Hive connector.
///
/// @param outputType List of column names and types to project out. Column
/// names should match the keys in the 'assignments' map. The 'assignments'
/// map may contain more columns then 'outputType' if some columns are only
Expand All @@ -156,7 +161,17 @@ class PlanBuilder {
std::string,
std::shared_ptr<connector::ColumnHandle>>& assignments);

/// Add a TableScanNode to scan a TPC-H table.
///
/// @param tpchTableHandle The handle that specifies the target TPC-H table
/// and scale factor.
/// @param columnNames The columns to be returned from that table.
PlanBuilder& tableScan(
const std::shared_ptr<connector::tpch::TpchTableHandle>& tpchTableHandle,
std::vector<std::string>&& columnNames);

/// Add a ValuesNode using specified data.
///
/// @param values The data to use.
/// @param parallelizable If true, ValuesNode can run multi-threaded, in which
/// case it will produce duplicate data from each thread, e.g. each thread
Expand Down
4 changes: 4 additions & 0 deletions velox/tpch/gen/TpchGen.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,10 @@ RowTypePtr getTableSchema(Table table) {
return nullptr; // make gcc happy.
}

TypePtr resolveTpchColumn(Table table, const std::string& columnName) {
return getTableSchema(table)->findChild(columnName);
}

RowVectorPtr genTpchOrders(
size_t maxRows,
size_t offset,
Expand Down
4 changes: 4 additions & 0 deletions velox/tpch/gen/TpchGen.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,10 @@ size_t getRowCount(Table table, size_t scaleFactor);
/// Returns the schema (RowType) for a particular TPC-H table.
RowTypePtr getTableSchema(Table table);

/// Returns the type of a particular table:column pair. Throws if `columnName`
/// does not exist in `table`.
TypePtr resolveTpchColumn(Table table, const std::string& columnName);

/// Returns a row vector containing at most `maxRows` rows of the "orders"
/// table, starting at `offset`, and given the scale factor. The row vector
/// returned has the following schema:
Expand Down

0 comments on commit 80a4e00

Please sign in to comment.