Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Draft] Add support to filter out plans that use Timestamp with Timezone #23179

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
// clang-format off
#include "presto_cpp/main/types/PrestoToVeloxConnector.h"
#include "presto_cpp/main/types/PrestoToVeloxQueryPlan.h"
#include "presto_cpp/main/common/Configs.h"
#include <velox/type/Filter.h>
#include "velox/core/QueryCtx.h"
#include "velox/exec/HashPartitionFunction.h"
Expand All @@ -23,6 +24,10 @@
#include "velox/vector/ComplexVector.h"
#include "velox/vector/FlatVector.h"
#include "velox/core/Expressions.h"
#include "velox/connectors/hive/TableHandle.h"
#include "velox/core/ITypedExpr.h"
#include "velox/functions/prestosql/types/TimestampWithTimeZoneType.h"

// clang-format on

#include <folly/String.h>
Expand Down Expand Up @@ -1756,7 +1761,7 @@ protocol::PlanNodeId toPartitionedOutputNodeId(const protocol::PlanNodeId& id) {

} // namespace

core::PlanFragment VeloxQueryPlanConverterBase::toVeloxQueryPlan(
core::PlanFragment VeloxQueryPlanConverterBase::toVeloxQueryPlanImpl(
const protocol::PlanFragment& fragment,
const std::shared_ptr<protocol::TableWriteInfo>& tableWriteInfo,
const protocol::TaskId& taskId) {
Expand Down Expand Up @@ -1924,6 +1929,124 @@ core::PlanFragment VeloxQueryPlanConverterBase::toVeloxQueryPlan(
return planFragment;
}

namespace {

bool rowHasTimestampWithTz(const RowTypePtr row) {
for (auto child : row->children()) {
if (isTimestampWithTimeZoneType(child)) {
return true;
}
}

return false;
}

bool exprHasTimestampWithTz(const std::vector<core::TypedExprPtr>& exprs) {
for (auto expr : exprs) {
if (isTimestampWithTimeZoneType(expr->type())) {
return true;
}
if (exprHasTimestampWithTz(expr->inputs())) {
return true;
}
}

return false;
}

} // namespace

bool planHasTimestampWithTimeZone(const core::PlanNodePtr planNode) {
// Do output types have timestamp with tz ?
if (rowHasTimestampWithTz(planNode->outputType())) {
return true;
}

// Is it a projectNode with timestamp with tz ?
if (auto projectNode =
std::dynamic_pointer_cast<const core::ProjectNode>(planNode)) {
if (!projectNode->projections().empty() &&
exprHasTimestampWithTz(projectNode->projections())) {
return true;
}
}

// Is it a filterNode with timestamp with tz ?
if (auto filterNode =
std::dynamic_pointer_cast<const core::FilterNode>(planNode)) {
if (filterNode->filter() &&
exprHasTimestampWithTz({filterNode->filter()})) {
return true;
}
}

// Is it one of the joinNodes with timestamp with tz ?
if (auto joinNode =
std::dynamic_pointer_cast<const core::AbstractJoinNode>(planNode)) {
Comment on lines +1984 to +1985
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can AbstractJoinNode be used standalone?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it would cover both HashJoin and MergeJoin.

if (joinNode->filter() && exprHasTimestampWithTz({joinNode->filter()})) {
return true;
}
}

if (auto joinNode =
std::dynamic_pointer_cast<const core::NestedLoopJoinNode>(planNode)) {
if (joinNode->joinCondition() &&
exprHasTimestampWithTz({joinNode->joinCondition()})) {
return true;
}
}

// Does aggregation use timestamp with tz ?
if (auto aggNode =
std::dynamic_pointer_cast<const core::AggregationNode>(planNode)) {
for (auto agg : aggNode->aggregates()) {
if (agg.call &&
(agg.call->type() && isTimestampWithTimeZoneType(agg.call->type()) ||
exprHasTimestampWithTz(agg.call->inputs()))) {
return true;
}
}
}

// Is it tablescan with remaining filter ?
if (auto tableScanNode =
std::dynamic_pointer_cast<const core::TableScanNode>(planNode)) {
if (auto hiveTableHandle =
std::dynamic_pointer_cast<connector::hive::HiveTableHandle>(
tableScanNode->tableHandle())) {
if (hiveTableHandle->remainingFilter() &&
exprHasTimestampWithTz({hiveTableHandle->remainingFilter()})) {
return true;
}
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AggregationNode::aggregates_::call is a CallTypedExprPtr that can contain a LambdaTypedExprPtr as input. Should we check it too?


// Do source plans have timestamp with tz ?
for (auto plan : planNode->sources()) {
if (planHasTimestampWithTimeZone(plan)) {
return true;
}
}

return false;
}

core::PlanFragment VeloxQueryPlanConverterBase::toVeloxQueryPlan(
const protocol::PlanFragment& fragment,
const std::shared_ptr<protocol::TableWriteInfo>& tableWriteInfo,
const protocol::TaskId& taskId) {
auto planFragment = toVeloxQueryPlanImpl(fragment, tableWriteInfo, taskId);

if (SystemConfig::instance()
->optionalProperty<bool>(kFailOnTimestampWithTimezone)
.value_or(false)) {
VELOX_CHECK(
!planHasTimestampWithTimeZone(planFragment.planNode),
"Plan uses Timestamp with Timezone");
}
return planFragment;
}

core::PlanNodePtr VeloxQueryPlanConverterBase::toVeloxQueryPlan(
const std::shared_ptr<const protocol::OutputNode>& node,
const std::shared_ptr<protocol::TableWriteInfo>& tableWriteInfo,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,18 @@ class VeloxQueryPlanConverterBase {
const std::shared_ptr<protocol::TableWriteInfo>& tableWriteInfo,
const protocol::TaskId& taskId);

/// Do we fail if we encounter timestamp with timezone in the plan ?
/// This is a config to reject queries which use this type due to issue
/// https://github.com/facebookincubator/velox/issues/10338
static constexpr std::string_view kFailOnTimestampWithTimezone{
"fail_on_timestamp_with_timezone"};

protected:
velox::core::PlanFragment toVeloxQueryPlanImpl(
const protocol::PlanFragment& fragment,
const std::shared_ptr<protocol::TableWriteInfo>& tableWriteInfo,
const protocol::TaskId& taskId);

virtual velox::core::PlanNodePtr toVeloxQueryPlan(
const std::shared_ptr<const protocol::RemoteSourceNode>& node,
const std::shared_ptr<protocol::TableWriteInfo>& tableWriteInfo,
Expand Down Expand Up @@ -271,5 +282,8 @@ class VeloxBatchQueryPlanConverter : public VeloxQueryPlanConverterBase {
const std::shared_ptr<std::string> broadcastBasePath_;
};

// Visible for testing
bool planHasTimestampWithTimeZone(const velox::core::PlanNodePtr planNode);

void registerPrestoPlanNodeSerDe();
} // namespace facebook::presto
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,12 @@
#include "presto_cpp/main/types/PrestoToVeloxConnector.h"
#include "presto_cpp/main/types/PrestoToVeloxQueryPlan.h"
#include "velox/connectors/hive/TableHandle.h"
#include "velox/exec/tests/utils/PlanBuilder.h"
#include "velox/exec/tests/utils/TempDirectoryPath.h"
#include "velox/functions/prestosql/aggregates/RegisterAggregateFunctions.h"
#include "velox/functions/prestosql/registration/RegistrationFunctions.h"
#include "velox/functions/prestosql/types/TimestampWithTimeZoneType.h"
#include "velox/parse/TypeResolver.h"

namespace fs = boost::filesystem;

Expand All @@ -51,7 +56,7 @@ std::string getDataPath(const std::string& fileName) {
// file not found. Fixing the path so that we can trigger these tests from
// CLion.
boost::algorithm::replace_all(currentPath, "cmake-build-release/", "");
boost::algorithm::replace_all(currentPath, "cmake-build-debug/", "");
boost::algorithm::replace_all(currentPath, "cmake-build-debug6/", "");

return currentPath + "/data/" + fileName;
}
Expand Down Expand Up @@ -105,6 +110,9 @@ class PlanConverterTest : public ::testing::Test {
std::make_unique<HivePrestoToVeloxConnector>("hive"));
registerPrestoToVeloxConnector(
std::make_unique<HivePrestoToVeloxConnector>("hive-plus"));
functions::prestosql::registerAllScalarFunctions();
aggregate::prestosql::registerAllAggregateFunctions();
parse::registerTypeResolver();
}

void TearDown() override {
Expand Down Expand Up @@ -220,3 +228,89 @@ TEST_F(PlanConverterTest, batchPlanConversion) {
std::dynamic_pointer_cast<const operators::ShuffleReadNode>(curNode);
ASSERT_NE(shuffleReadNode, nullptr);
}

TEST_F(PlanConverterTest, timestampWithTimeZone) {
auto makeEmptyRowVector = [&](const RowTypePtr& rowType) {
return std::make_shared<RowVector>(
memory::deprecatedAddDefaultLeafMemoryPool().get(),
rowType,
nullptr,
0,
std::vector<VectorPtr>{});
};
// Ensure Physically equivalent types arent filtered out.
auto plan = exec::test::PlanBuilder()
.startTableScan()
.outputType(ROW({"a"}, {BIGINT()}))
.endTableScan()
.singleAggregation({}, {"min(a)"})
.planNode();

ASSERT_FALSE(planHasTimestampWithTimeZone(plan));

plan = exec::test::PlanBuilder()
.startTableScan()
.outputType(ROW({"a"}, {TIMESTAMP_WITH_TIME_ZONE()}))
.endTableScan()
.singleAggregation({}, {"min(a)"})
.planNode();

ASSERT_TRUE(planHasTimestampWithTimeZone(plan));

auto testHashJoin = [&](std::vector<TypePtr>&& probeTypes,
std::vector<TypePtr>&& buildTypes) {
auto probe =
makeEmptyRowVector(ROW({"t0", "t1", "t2"}, std::move(probeTypes)));

auto build =
makeEmptyRowVector(ROW({"u0", "u1", "u2"}, std::move(buildTypes)));

auto planNodeIdGenerator = std::make_shared<core::PlanNodeIdGenerator>();
auto plan = exec::test::PlanBuilder(planNodeIdGenerator)
.values({probe})
.hashJoin(
{"t0"},
{"u0"},
exec::test::PlanBuilder(planNodeIdGenerator)
.values({build})
.planNode(),
"t1 > u1",
{"t0", "t1", "u2", "t2"},
core::JoinType::kInner)
.planNode();
return planHasTimestampWithTimeZone(plan);
};

ASSERT_FALSE(testHashJoin(
{BIGINT(), INTEGER(), VARCHAR()}, {BIGINT(), INTEGER(), BIGINT()}));
ASSERT_TRUE(testHashJoin(
{BIGINT(), INTEGER(), VARCHAR()},
{BIGINT(), INTEGER(), TIMESTAMP_WITH_TIME_ZONE()}));

// Project node
auto valuesVector = makeEmptyRowVector(
ROW({"c0", "c1"}, {BIGINT(), TIMESTAMP_WITH_TIME_ZONE()}));
plan = exec::test::PlanBuilder()
.values({valuesVector})
.project({"c0 * 10", "c1"})
.planNode();

ASSERT_TRUE(planHasTimestampWithTimeZone(plan));

// Filter node
plan = exec::test::PlanBuilder()
.values({valuesVector})
.filter("c0 > 100")
.planNode();

ASSERT_TRUE(planHasTimestampWithTimeZone(plan));

// Aggregation Node
plan = exec::test::PlanBuilder()
.values({valuesVector})
.partialAggregation({"c0"}, {"count(1)"})
.finalAggregation()
.planNode();

ASSERT_TRUE(planHasTimestampWithTimeZone(plan));
}
Loading