Skip to content

Commit

Permalink
Add support to filter out plans that use Timestamp with Timezone
Browse files Browse the repository at this point in the history
  • Loading branch information
kgpai committed Jul 11, 2024
1 parent affcc94 commit 5c38d5a
Show file tree
Hide file tree
Showing 3 changed files with 204 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@
#include "velox/vector/ComplexVector.h"
#include "velox/vector/FlatVector.h"
#include "velox/core/Expressions.h"
#include "velox/connectors/hive/TableHandle.h"
#include "velox/core/ITypedExpr.h"
#include "velox/functions/prestosql/types/TimestampWithTimeZoneType.h"
// clang-format on

#include <folly/String.h>
Expand Down Expand Up @@ -1756,7 +1759,7 @@ protocol::PlanNodeId toPartitionedOutputNodeId(const protocol::PlanNodeId& id) {

} // namespace

core::PlanFragment VeloxQueryPlanConverterBase::toVeloxQueryPlan(
core::PlanFragment VeloxQueryPlanConverterBase::toVeloxQueryPlanImpl(
const protocol::PlanFragment& fragment,
const std::shared_ptr<protocol::TableWriteInfo>& tableWriteInfo,
const protocol::TaskId& taskId) {
Expand Down Expand Up @@ -1924,6 +1927,107 @@ core::PlanFragment VeloxQueryPlanConverterBase::toVeloxQueryPlan(
return planFragment;
}

namespace {

bool rowHasTimestampWithTz(const RowTypePtr row) {
for (auto child : row->children()) {
if (isTimestampWithTimeZoneType(child)) {
return true;
}
}

return false;
}

bool exprHasTimestampWithTz(const std::vector<core::TypedExprPtr>& exprs) {
for (auto expr : exprs) {
if (expr->type()->kindEquals(TIMESTAMP_WITH_TIME_ZONE())) {
return true;
}
if (exprHasTimestampWithTz(expr->inputs())) {
return true;
}
}

return false;
}

} // namespace

bool planHasTimestampWithTimeZone(const core::PlanNodePtr planNode) {
// Do output types have timestamp with tz ?
if (rowHasTimestampWithTz(planNode->outputType())) {
return true;
}

// Is it a projectNode with timestamp with tz ?
if (auto projectNode =
std::dynamic_pointer_cast<const core::ProjectNode>(planNode)) {
if (exprHasTimestampWithTz(projectNode->projections())) {
return true;
}
}

// Is it a filterNode with timestamp with tz ?
if (auto filterNode =
std::dynamic_pointer_cast<const core::FilterNode>(planNode)) {
if (exprHasTimestampWithTz({filterNode->filter()})) {
return true;
}
}

// Is it one of the joinNodes with timestamp with tz ?
if (auto joinNode =
std::dynamic_pointer_cast<const core::AbstractJoinNode>(planNode)) {
if (joinNode->filter() && exprHasTimestampWithTz({joinNode->filter()})) {
return true;
}
}

if (auto joinNode =
std::dynamic_pointer_cast<const core::NestedLoopJoinNode>(planNode)) {
if (exprHasTimestampWithTz({joinNode->joinCondition()})) {
return true;
}
}

// Is it tablescan with remaining filter ?
if (auto tableScanNode =
std::dynamic_pointer_cast<const core::TableScanNode>(planNode)) {
if (auto hiveTableHandle =
std::dynamic_pointer_cast<connector::hive::HiveTableHandle>(
tableScanNode->tableHandle())) {
if (hiveTableHandle->remainingFilter() &&
exprHasTimestampWithTz({hiveTableHandle->remainingFilter()})) {
return true;
}
}
}

// Do source plans have timestamp with tz ?
for (auto plan : planNode->sources()) {
if (planHasTimestampWithTimeZone(plan)) {
return true;
}
}

return false;
}

core::PlanFragment VeloxQueryPlanConverterBase::toVeloxQueryPlan(
const protocol::PlanFragment& fragment,
const std::shared_ptr<protocol::TableWriteInfo>& tableWriteInfo,
const protocol::TaskId& taskId) {
auto planFragment = toVeloxQueryPlanImpl(fragment, tableWriteInfo, taskId);

if (queryCtx_->queryConfig().get<bool>(kFailOnTimestampWithTimezone, true)) {
VELOX_CHECK(
planHasTimestampWithTimeZone(planFragment.planNode),
"Plan uses Timestamp with Timezone");
}
return planFragment;
}

core::PlanNodePtr VeloxQueryPlanConverterBase::toVeloxQueryPlan(
const std::shared_ptr<const protocol::OutputNode>& node,
const std::shared_ptr<protocol::TableWriteInfo>& tableWriteInfo,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,18 @@ class VeloxQueryPlanConverterBase {
const std::shared_ptr<protocol::TableWriteInfo>& tableWriteInfo,
const protocol::TaskId& taskId);

/// Do we fail if we encounter timestamp with timezone in the plan ?
/// This is a config to reject queries which use this type due to issue
/// https://github.com/facebookincubator/velox/issues/10338
static constexpr const char* kFailOnTimestampWithTimezone =
"fail_on_timestamp_with_timezone";

protected:
velox::core::PlanFragment toVeloxQueryPlanImpl(
const protocol::PlanFragment& fragment,
const std::shared_ptr<protocol::TableWriteInfo>& tableWriteInfo,
const protocol::TaskId& taskId);

virtual velox::core::PlanNodePtr toVeloxQueryPlan(
const std::shared_ptr<const protocol::RemoteSourceNode>& node,
const std::shared_ptr<protocol::TableWriteInfo>& tableWriteInfo,
Expand Down Expand Up @@ -271,5 +282,8 @@ class VeloxBatchQueryPlanConverter : public VeloxQueryPlanConverterBase {
const std::shared_ptr<std::string> broadcastBasePath_;
};

// Visible for testing
bool planHasTimestampWithTimeZone(const velox::core::PlanNodePtr planNode);

void registerPrestoPlanNodeSerDe();
} // namespace facebook::presto
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,12 @@
#include "presto_cpp/main/types/PrestoToVeloxConnector.h"
#include "presto_cpp/main/types/PrestoToVeloxQueryPlan.h"
#include "velox/connectors/hive/TableHandle.h"
#include "velox/exec/tests/utils/PlanBuilder.h"
#include "velox/exec/tests/utils/TempDirectoryPath.h"
#include "velox/functions/prestosql/aggregates/RegisterAggregateFunctions.h"
#include "velox/functions/prestosql/registration/RegistrationFunctions.h"
#include "velox/functions/prestosql/types/TimestampWithTimeZoneType.h"
#include "velox/parse/TypeResolver.h"

namespace fs = boost::filesystem;

Expand Down Expand Up @@ -105,6 +110,9 @@ class PlanConverterTest : public ::testing::Test {
std::make_unique<HivePrestoToVeloxConnector>("hive"));
registerPrestoToVeloxConnector(
std::make_unique<HivePrestoToVeloxConnector>("hive-plus"));
functions::prestosql::registerAllScalarFunctions();
aggregate::prestosql::registerAllAggregateFunctions();
parse::registerTypeResolver();
}

void TearDown() override {
Expand Down Expand Up @@ -220,3 +228,80 @@ TEST_F(PlanConverterTest, batchPlanConversion) {
std::dynamic_pointer_cast<const operators::ShuffleReadNode>(curNode);
ASSERT_NE(shuffleReadNode, nullptr);
}

TEST_F(PlanConverterTest, timestampWithTimeZone) {
auto makeEmptyRowVector = [&](const RowTypePtr& rowType) {
return std::make_shared<RowVector>(
memory::deprecatedAddDefaultLeafMemoryPool().get(),
rowType,
nullptr,
0,
std::vector<VectorPtr>{});
};
// Ensure Physically equivalent types arent filtered out.
auto plan = exec::test::PlanBuilder()
.startTableScan()
.outputType(ROW({"a"}, {BIGINT()}))
.endTableScan()
.singleAggregation({}, {"min(a)"})
.planNode();

ASSERT_FALSE(planHasTimestampWithTimeZone(plan));

plan = exec::test::PlanBuilder()
.startTableScan()
.outputType(ROW({"a"}, {TIMESTAMP_WITH_TIME_ZONE()}))
.endTableScan()
.singleAggregation({}, {"min(a)"})
.planNode();

ASSERT_TRUE(planHasTimestampWithTimeZone(plan));

auto testHashJoin = [&](std::vector<TypePtr>&& probeTypes,
std::vector<TypePtr>&& buildTypes) {
auto probe =
makeEmptyRowVector(ROW({"t0", "t1", "t2"}, std::move(probeTypes)));

auto build =
makeEmptyRowVector(ROW({"u0", "u1", "u2"}, std::move(buildTypes)));

auto planNodeIdGenerator = std::make_shared<core::PlanNodeIdGenerator>();
auto plan = exec::test::PlanBuilder(planNodeIdGenerator)
.values({probe})
.hashJoin(
{"t0"},
{"u0"},
exec::test::PlanBuilder(planNodeIdGenerator)
.values({build})
.planNode(),
"t1 > u1",
{"t0", "t1", "u2", "t2"},
core::JoinType::kInner)
.planNode();
return planHasTimestampWithTimeZone(plan);
};

ASSERT_FALSE(testHashJoin(
{BIGINT(), INTEGER(), VARCHAR()}, {BIGINT(), INTEGER(), BIGINT()}));
ASSERT_TRUE(testHashJoin(
{BIGINT(), INTEGER(), VARCHAR()},
{BIGINT(), INTEGER(), TIMESTAMP_WITH_TIME_ZONE()}));

// Project node
auto valuesVector = makeEmptyRowVector(
ROW({"c0", "c1"}, {BIGINT(), TIMESTAMP_WITH_TIME_ZONE()}));
plan = exec::test::PlanBuilder()
.values({valuesVector})
.project({"c0 * 10", "c1"})
.planNode();

ASSERT_TRUE(planHasTimestampWithTimeZone(plan));

// Filter node
plan = exec::test::PlanBuilder()
.values({valuesVector})
.filter("c0 > 100")
.planNode();

ASSERT_TRUE(planHasTimestampWithTimeZone(plan));
}

0 comments on commit 5c38d5a

Please sign in to comment.