From 5c38d5a89265e40dfd7048ec3acaebcf25956356 Mon Sep 17 00:00:00 2001 From: Krishna Pai Date: Wed, 10 Jul 2024 18:05:05 -0700 Subject: [PATCH] Add support to filter out plans that use Timestamp with Timezone --- .../main/types/PrestoToVeloxQueryPlan.cpp | 106 +++++++++++++++++- .../main/types/PrestoToVeloxQueryPlan.h | 14 +++ .../main/types/tests/PlanConverterTest.cpp | 85 ++++++++++++++ 3 files changed, 204 insertions(+), 1 deletion(-) diff --git a/presto-native-execution/presto_cpp/main/types/PrestoToVeloxQueryPlan.cpp b/presto-native-execution/presto_cpp/main/types/PrestoToVeloxQueryPlan.cpp index f84ed68c18224..a1e42c462710b 100644 --- a/presto-native-execution/presto_cpp/main/types/PrestoToVeloxQueryPlan.cpp +++ b/presto-native-execution/presto_cpp/main/types/PrestoToVeloxQueryPlan.cpp @@ -23,6 +23,9 @@ #include "velox/vector/ComplexVector.h" #include "velox/vector/FlatVector.h" #include "velox/core/Expressions.h" +#include "velox/connectors/hive/TableHandle.h" +#include "velox/core/ITypedExpr.h" +#include "velox/functions/prestosql/types/TimestampWithTimeZoneType.h" // clang-format on #include @@ -1756,7 +1759,7 @@ protocol::PlanNodeId toPartitionedOutputNodeId(const protocol::PlanNodeId& id) { } // namespace -core::PlanFragment VeloxQueryPlanConverterBase::toVeloxQueryPlan( +core::PlanFragment VeloxQueryPlanConverterBase::toVeloxQueryPlanImpl( const protocol::PlanFragment& fragment, const std::shared_ptr& tableWriteInfo, const protocol::TaskId& taskId) { @@ -1924,6 +1927,107 @@ core::PlanFragment VeloxQueryPlanConverterBase::toVeloxQueryPlan( return planFragment; } +namespace { + +bool rowHasTimestampWithTz(const RowTypePtr row) { + for (auto child : row->children()) { + if (isTimestampWithTimeZoneType(child)) { + return true; + } + } + + return false; +} + +bool exprHasTimestampWithTz(const std::vector& exprs) { + for (auto expr : exprs) { + if (expr->type()->kindEquals(TIMESTAMP_WITH_TIME_ZONE())) { + return true; + } + if (exprHasTimestampWithTz(expr->inputs())) { + return true; + } + } + + return false; +} + +} // namespace + +bool planHasTimestampWithTimeZone(const core::PlanNodePtr planNode) { + // Do output types have timestamp with tz ? + if (rowHasTimestampWithTz(planNode->outputType())) { + return true; + } + + // Is it a projectNode with timestamp with tz ? + if (auto projectNode = + std::dynamic_pointer_cast(planNode)) { + if (exprHasTimestampWithTz(projectNode->projections())) { + return true; + } + } + + // Is it a filterNode with timestamp with tz ? + if (auto filterNode = + std::dynamic_pointer_cast(planNode)) { + if (exprHasTimestampWithTz({filterNode->filter()})) { + return true; + } + } + + // Is it one of the joinNodes with timestamp with tz ? + if (auto joinNode = + std::dynamic_pointer_cast(planNode)) { + if (joinNode->filter() && exprHasTimestampWithTz({joinNode->filter()})) { + return true; + } + } + + if (auto joinNode = + std::dynamic_pointer_cast(planNode)) { + if (exprHasTimestampWithTz({joinNode->joinCondition()})) { + return true; + } + } + + // Is it tablescan with remaining filter ? + if (auto tableScanNode = + std::dynamic_pointer_cast(planNode)) { + if (auto hiveTableHandle = + std::dynamic_pointer_cast( + tableScanNode->tableHandle())) { + if (hiveTableHandle->remainingFilter() && + exprHasTimestampWithTz({hiveTableHandle->remainingFilter()})) { + return true; + } + } + } + + // Do source plans have timestamp with tz ? + for (auto plan : planNode->sources()) { + if (planHasTimestampWithTimeZone(plan)) { + return true; + } + } + + return false; +} + +core::PlanFragment VeloxQueryPlanConverterBase::toVeloxQueryPlan( + const protocol::PlanFragment& fragment, + const std::shared_ptr& tableWriteInfo, + const protocol::TaskId& taskId) { + auto planFragment = toVeloxQueryPlanImpl(fragment, tableWriteInfo, taskId); + + if (queryCtx_->queryConfig().get(kFailOnTimestampWithTimezone, true)) { + VELOX_CHECK( + planHasTimestampWithTimeZone(planFragment.planNode), + "Plan uses Timestamp with Timezone"); + } + return planFragment; +} + core::PlanNodePtr VeloxQueryPlanConverterBase::toVeloxQueryPlan( const std::shared_ptr& node, const std::shared_ptr& tableWriteInfo, diff --git a/presto-native-execution/presto_cpp/main/types/PrestoToVeloxQueryPlan.h b/presto-native-execution/presto_cpp/main/types/PrestoToVeloxQueryPlan.h index af17ad6e1b10f..c67b264b07c2a 100644 --- a/presto-native-execution/presto_cpp/main/types/PrestoToVeloxQueryPlan.h +++ b/presto-native-execution/presto_cpp/main/types/PrestoToVeloxQueryPlan.h @@ -48,7 +48,18 @@ class VeloxQueryPlanConverterBase { const std::shared_ptr& tableWriteInfo, const protocol::TaskId& taskId); + /// Do we fail if we encounter timestamp with timezone in the plan ? + /// This is a config to reject queries which use this type due to issue + /// https://github.com/facebookincubator/velox/issues/10338 + static constexpr const char* kFailOnTimestampWithTimezone = + "fail_on_timestamp_with_timezone"; + protected: + velox::core::PlanFragment toVeloxQueryPlanImpl( + const protocol::PlanFragment& fragment, + const std::shared_ptr& tableWriteInfo, + const protocol::TaskId& taskId); + virtual velox::core::PlanNodePtr toVeloxQueryPlan( const std::shared_ptr& node, const std::shared_ptr& tableWriteInfo, @@ -271,5 +282,8 @@ class VeloxBatchQueryPlanConverter : public VeloxQueryPlanConverterBase { const std::shared_ptr broadcastBasePath_; }; +// Visible for testing +bool planHasTimestampWithTimeZone(const velox::core::PlanNodePtr planNode); + void registerPrestoPlanNodeSerDe(); } // namespace facebook::presto diff --git a/presto-native-execution/presto_cpp/main/types/tests/PlanConverterTest.cpp b/presto-native-execution/presto_cpp/main/types/tests/PlanConverterTest.cpp index f0b351f42cebc..1f4d6c65be75f 100644 --- a/presto-native-execution/presto_cpp/main/types/tests/PlanConverterTest.cpp +++ b/presto-native-execution/presto_cpp/main/types/tests/PlanConverterTest.cpp @@ -24,7 +24,12 @@ #include "presto_cpp/main/types/PrestoToVeloxConnector.h" #include "presto_cpp/main/types/PrestoToVeloxQueryPlan.h" #include "velox/connectors/hive/TableHandle.h" +#include "velox/exec/tests/utils/PlanBuilder.h" #include "velox/exec/tests/utils/TempDirectoryPath.h" +#include "velox/functions/prestosql/aggregates/RegisterAggregateFunctions.h" +#include "velox/functions/prestosql/registration/RegistrationFunctions.h" +#include "velox/functions/prestosql/types/TimestampWithTimeZoneType.h" +#include "velox/parse/TypeResolver.h" namespace fs = boost::filesystem; @@ -105,6 +110,9 @@ class PlanConverterTest : public ::testing::Test { std::make_unique("hive")); registerPrestoToVeloxConnector( std::make_unique("hive-plus")); + functions::prestosql::registerAllScalarFunctions(); + aggregate::prestosql::registerAllAggregateFunctions(); + parse::registerTypeResolver(); } void TearDown() override { @@ -220,3 +228,80 @@ TEST_F(PlanConverterTest, batchPlanConversion) { std::dynamic_pointer_cast(curNode); ASSERT_NE(shuffleReadNode, nullptr); } + +TEST_F(PlanConverterTest, timestampWithTimeZone) { + auto makeEmptyRowVector = [&](const RowTypePtr& rowType) { + return std::make_shared( + memory::deprecatedAddDefaultLeafMemoryPool().get(), + rowType, + nullptr, + 0, + std::vector{}); + }; + // Ensure Physically equivalent types arent filtered out. + auto plan = exec::test::PlanBuilder() + .startTableScan() + .outputType(ROW({"a"}, {BIGINT()})) + .endTableScan() + .singleAggregation({}, {"min(a)"}) + .planNode(); + + ASSERT_FALSE(planHasTimestampWithTimeZone(plan)); + + plan = exec::test::PlanBuilder() + .startTableScan() + .outputType(ROW({"a"}, {TIMESTAMP_WITH_TIME_ZONE()})) + .endTableScan() + .singleAggregation({}, {"min(a)"}) + .planNode(); + + ASSERT_TRUE(planHasTimestampWithTimeZone(plan)); + + auto testHashJoin = [&](std::vector&& probeTypes, + std::vector&& buildTypes) { + auto probe = + makeEmptyRowVector(ROW({"t0", "t1", "t2"}, std::move(probeTypes))); + + auto build = + makeEmptyRowVector(ROW({"u0", "u1", "u2"}, std::move(buildTypes))); + + auto planNodeIdGenerator = std::make_shared(); + auto plan = exec::test::PlanBuilder(planNodeIdGenerator) + .values({probe}) + .hashJoin( + {"t0"}, + {"u0"}, + exec::test::PlanBuilder(planNodeIdGenerator) + .values({build}) + .planNode(), + "t1 > u1", + {"t0", "t1", "u2", "t2"}, + core::JoinType::kInner) + .planNode(); + return planHasTimestampWithTimeZone(plan); + }; + + ASSERT_FALSE(testHashJoin( + {BIGINT(), INTEGER(), VARCHAR()}, {BIGINT(), INTEGER(), BIGINT()})); + ASSERT_TRUE(testHashJoin( + {BIGINT(), INTEGER(), VARCHAR()}, + {BIGINT(), INTEGER(), TIMESTAMP_WITH_TIME_ZONE()})); + + // Project node + auto valuesVector = makeEmptyRowVector( + ROW({"c0", "c1"}, {BIGINT(), TIMESTAMP_WITH_TIME_ZONE()})); + plan = exec::test::PlanBuilder() + .values({valuesVector}) + .project({"c0 * 10", "c1"}) + .planNode(); + + ASSERT_TRUE(planHasTimestampWithTimeZone(plan)); + + // Filter node + plan = exec::test::PlanBuilder() + .values({valuesVector}) + .filter("c0 > 100") + .planNode(); + + ASSERT_TRUE(planHasTimestampWithTimeZone(plan)); +}