Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[New Featrue] Support Vectorization Execution Engine Interface For Doris #6329

Merged
merged 2 commits into from
Aug 11, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion be/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,7 @@ check_function_exists(sched_getcpu HAVE_SCHED_GETCPU)
# -DBOOST_DATE_TIME_POSIX_TIME_STD_CONFIG: enable nanosecond precision for boost
# -fno-omit-frame-pointers: Keep frame pointer for functions in register
set(CXX_COMMON_FLAGS "${CXX_COMMON_FLAGS} -Wall -Wno-sign-compare -Wno-unknown-pragmas -pthread -Werror=strict-aliasing")
set(CXX_COMMON_FLAGS "${CXX_COMMON_FLAGS} -fno-omit-frame-pointer")
set(CXX_COMMON_FLAGS "${CXX_COMMON_FLAGS} -fstrict-aliasing -fno-omit-frame-pointer -Werror=return-type")
set(CXX_COMMON_FLAGS "${CXX_COMMON_FLAGS} -std=gnu++17 -D__STDC_FORMAT_MACROS")
set(CXX_COMMON_FLAGS "${CXX_COMMON_FLAGS} -Wno-deprecated -Wno-vla")
set(CXX_COMMON_FLAGS "${CXX_COMMON_FLAGS} -DBOOST_DATE_TIME_POSIX_TIME_STD_CONFIG")
Expand Down
5 changes: 5 additions & 0 deletions be/src/common/compiler_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,12 @@
/// not a command. This should be used sparingly for cases when either the function
/// needs to be inlined for a specific reason or the compiler's heuristics make a bad
/// decision, e.g. not inlining a small function on a hot path.
#ifdef ALWAYS_INLINE
#undef ALWAYS_INLINE
#endif
#define ALWAYS_INLINE __attribute__((always_inline))
#define NO_INLINE __attribute__((__noinline__))
#define MAY_ALIAS __attribute__((__may_alias__))

#define ALIGN_CACHE_LINE __attribute__((aligned(CACHE_LINE_SIZE)))

Expand Down
1 change: 1 addition & 0 deletions be/src/common/logging.h
Original file line number Diff line number Diff line change
Expand Up @@ -85,4 +85,5 @@
DCHECK(a == b) << "[ " #a " = " << static_cast<int>(a) << " , " #b " = " \
<< static_cast<int>(b) << " ]"

#include <fmt/format.h>
#endif
2 changes: 2 additions & 0 deletions be/src/common/status.h
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,8 @@ class Status {
/// trailing message.
Status clone_and_append(const Slice& msg) const;

operator bool() { return this->ok(); }

private:
const char* copy_state(const char* state);

Expand Down
13 changes: 11 additions & 2 deletions be/src/exec/aggregation_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

#include <gperftools/profiler.h>
#include <math.h>
#include <thrift/protocol/TDebugProtocol.h>

#include <sstream>

Expand Down Expand Up @@ -223,6 +222,15 @@ Status AggregationNode::open(RuntimeState* state) {
}

Status AggregationNode::get_next(RuntimeState* state, RowBatch* row_batch, bool* eos) {
// 1. `!need_finalize` means this aggregation node not the level two aggregation node
// 2. `_singleton_output_tuple != nullptr` means is not group by
// 3. `child(0)->rows_returned() == 0` mean not data from child
// in level two aggregation node should return NULL result
// level one aggregation node set `eos = true` return directly
if (UNLIKELY(!_needs_finalize && _singleton_output_tuple != nullptr && child(0)->rows_returned() == 0)) {
*eos = true;
return Status::OK();
}
SCOPED_TIMER(_runtime_profile->total_time_counter());
RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::GETNEXT));
RETURN_IF_CANCELLED(state);
Expand Down Expand Up @@ -403,7 +411,8 @@ Tuple* AggregationNode::finalize_tuple(Tuple* tuple, MemPool* pool) {
dst = Tuple::create(_output_tuple_desc->byte_size(), pool);
}
if (_needs_finalize) {
AggFnEvaluator::finalize(_aggregate_evaluators, _agg_fn_ctxs, tuple, dst);
AggFnEvaluator::finalize(_aggregate_evaluators, _agg_fn_ctxs, tuple, dst,
_singleton_output_tuple != nullptr && child(0)->rows_returned() == 0);
} else {
AggFnEvaluator::serialize(_aggregate_evaluators, _agg_fn_ctxs, tuple);
}
Expand Down
2 changes: 1 addition & 1 deletion be/src/exec/broker_scan_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ BrokerScanNode::BrokerScanNode(ObjectPool* pool, const TPlanNode& tnode, const D
BrokerScanNode::~BrokerScanNode() {}

Status BrokerScanNode::init(const TPlanNode& tnode, RuntimeState* state) {
RETURN_IF_ERROR(ScanNode::init(tnode));
RETURN_IF_ERROR(ScanNode::init(tnode, state));
auto& broker_scan_node = tnode.broker_scan_node;

if (broker_scan_node.__isset.pre_filter_exprs) {
Expand Down
2 changes: 1 addition & 1 deletion be/src/exec/cross_join_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ Status CrossJoinNode::construct_build_side(RuntimeState* state) {
RETURN_IF_CANCELLED(state);
// TODO(zhaochun):
// RETURN_IF_ERROR(state->CheckQueryState());
bool eos = true;
bool eos = false;
RETURN_IF_ERROR(child(1)->get_next(state, batch, &eos));

// to prevent use too many memory
Expand Down
24 changes: 15 additions & 9 deletions be/src/exec/data_sink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ Status DataSink::create_data_sink(ObjectPool* pool, const TDataSink& thrift_sink
const std::vector<TExpr>& output_exprs,
const TPlanFragmentExecParams& params,
const RowDescriptor& row_desc,
bool is_vec,
boost::scoped_ptr<DataSink>* sink) {
DataSink* tmp_sink = NULL;

Expand All @@ -55,9 +56,12 @@ Status DataSink::create_data_sink(ObjectPool* pool, const TDataSink& thrift_sink
? params.send_query_statistics_with_every_batch
: false;
// TODO: figure out good buffer size based on size of output row
tmp_sink = new DataStreamSender(pool, params.sender_id, row_desc, thrift_sink.stream_sink,
params.destinations, 16 * 1024,
send_query_statistics_with_every_batch);
if (is_vec) {
} else {
tmp_sink = new DataStreamSender(pool, params.sender_id, row_desc, thrift_sink.stream_sink,
params.destinations, 16 * 1024,
send_query_statistics_with_every_batch);
}
// RETURN_IF_ERROR(sender->prepare(state->obj_pool(), thrift_sink.stream_sink));
sink->reset(tmp_sink);
break;
Expand All @@ -68,7 +72,10 @@ Status DataSink::create_data_sink(ObjectPool* pool, const TDataSink& thrift_sink
}

// TODO: figure out good buffer size based on size of output row
tmp_sink = new ResultSink(row_desc, output_exprs, thrift_sink.result_sink, 1024);
if (is_vec) {
} else {
tmp_sink = new ResultSink(row_desc, output_exprs, thrift_sink.result_sink, 1024);
}
sink->reset(tmp_sink);
break;
case TDataSinkType::MEMORY_SCRATCH_SINK:
Expand Down Expand Up @@ -98,8 +105,7 @@ Status DataSink::create_data_sink(ObjectPool* pool, const TDataSink& thrift_sink
if (!thrift_sink.__isset.odbc_table_sink) {
return Status::InternalError("Missing data odbc sink.");
}
OdbcTableSink* odbc_tbl_sink = new OdbcTableSink(pool,
row_desc, output_exprs);
OdbcTableSink* odbc_tbl_sink = new OdbcTableSink(pool, row_desc, output_exprs);
sink->reset(odbc_tbl_sink);
break;
}
Expand Down Expand Up @@ -158,9 +164,9 @@ Status DataSink::init(const TDataSink& thrift_sink) {
}

Status DataSink::prepare(RuntimeState* state) {
_expr_mem_tracker = MemTracker::CreateTracker(
-1, _name + ":Expr:" + std::to_string(state->load_job_id()),
state->instance_mem_tracker());
_expr_mem_tracker =
MemTracker::CreateTracker(-1, _name + ":Expr:" + std::to_string(state->load_job_id()),
state->instance_mem_tracker());
return Status::OK();
}

Expand Down
1 change: 1 addition & 0 deletions be/src/exec/data_sink.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ class DataSink {
const std::vector<TExpr>& output_exprs,
const TPlanFragmentExecParams& params,
const RowDescriptor& row_desc,
bool is_vec,
boost::scoped_ptr<DataSink>* sink);

// Returns the runtime profile for the sink.
Expand Down
62 changes: 47 additions & 15 deletions be/src/exec/exec_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -127,9 +127,7 @@ ExecNode::ExecNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl
_rows_returned_counter(NULL),
_rows_returned_rate(NULL),
_memory_used_counter(NULL),
_is_closed(false) {
init_runtime_profile(print_plan_node_type(tnode.node_type));
}
_is_closed(false) {}

ExecNode::~ExecNode() {}

Expand Down Expand Up @@ -159,7 +157,18 @@ void ExecNode::push_down_predicate(RuntimeState* state, std::list<ExprContext*>*
}

Status ExecNode::init(const TPlanNode& tnode, RuntimeState* state) {
std::string profile;
if (state && state->enable_vectorized_exec()) {
profile = "V" + print_plan_node_type(tnode.node_type);
} else {
profile = print_plan_node_type(tnode.node_type);
}
init_runtime_profile(profile);

if (tnode.__isset.vconjunct) {
}
RETURN_IF_ERROR(Expr::create_expr_trees(_pool, tnode.conjuncts, &_conjunct_ctxs));

return Status::OK();
}

Expand All @@ -178,11 +187,11 @@ Status ExecNode::prepare(RuntimeState* state) {
_expr_mem_tracker = MemTracker::CreateTracker(-1, "ExecNode:Exprs:" + _runtime_profile->name(),
_mem_tracker);
_expr_mem_pool.reset(new MemPool(_expr_mem_tracker.get()));
// TODO chenhao

RETURN_IF_ERROR(Expr::prepare(_conjunct_ctxs, state, row_desc(), expr_mem_tracker()));

// TODO(zc):
// AddExprCtxsToFree(_conjunct_ctxs);

for (int i = 0; i < _children.size(); ++i) {
RETURN_IF_ERROR(_children[i]->prepare(state));
}
Expand Down Expand Up @@ -362,22 +371,32 @@ Status ExecNode::create_node(RuntimeState* state, ObjectPool* pool, const TPlanN
return Status::OK();

case TPlanNodeType::OLAP_SCAN_NODE:
*node = pool->add(new OlapScanNode(pool, tnode, descs));
if (state->enable_vectorized_exec()) {
} else {
*node = pool->add(new OlapScanNode(pool, tnode, descs));
}
return Status::OK();

case TPlanNodeType::AGGREGATION_NODE:
if (config::enable_partitioned_aggregation) {
*node = pool->add(new PartitionedAggregationNode(pool, tnode, descs));
if (state->enable_vectorized_exec()) {
} else {
*node = pool->add(new AggregationNode(pool, tnode, descs));
if (config::enable_partitioned_aggregation) {
*node = pool->add(new PartitionedAggregationNode(pool, tnode, descs));
} else {
*node = pool->add(new AggregationNode(pool, tnode, descs));
}
}
return Status::OK();

case TPlanNodeType::HASH_JOIN_NODE:
*node = pool->add(new HashJoinNode(pool, tnode, descs));
return Status::OK();

case TPlanNodeType::CROSS_JOIN_NODE:
*node = pool->add(new CrossJoinNode(pool, tnode, descs));
if (state->enable_vectorized_exec()) {
} else {
*node = pool->add(new CrossJoinNode(pool, tnode, descs));
}
return Status::OK();

case TPlanNodeType::MERGE_JOIN_NODE:
Expand All @@ -389,7 +408,10 @@ Status ExecNode::create_node(RuntimeState* state, ObjectPool* pool, const TPlanN
return Status::OK();

case TPlanNodeType::EXCHANGE_NODE:
*node = pool->add(new ExchangeNode(pool, tnode, descs));
if (state->enable_vectorized_exec()) {
} else {
*node = pool->add(new ExchangeNode(pool, tnode, descs));
}
return Status::OK();

case TPlanNodeType::SELECT_NODE:
Expand All @@ -401,10 +423,13 @@ Status ExecNode::create_node(RuntimeState* state, ObjectPool* pool, const TPlanN
return Status::OK();

case TPlanNodeType::SORT_NODE:
if (tnode.sort_node.use_top_n) {
*node = pool->add(new TopNNode(pool, tnode, descs));
if (state->enable_vectorized_exec()) {
} else {
*node = pool->add(new SpillSortNode(pool, tnode, descs));
if (tnode.sort_node.use_top_n) {
*node = pool->add(new TopNNode(pool, tnode, descs));
} else {
*node = pool->add(new SpillSortNode(pool, tnode, descs));
}
}

return Status::OK();
Expand All @@ -417,7 +442,10 @@ Status ExecNode::create_node(RuntimeState* state, ObjectPool* pool, const TPlanN
return Status::OK();

case TPlanNodeType::UNION_NODE:
*node = pool->add(new UnionNode(pool, tnode, descs));
if (state->enable_vectorized_exec()) {
} else {
*node = pool->add(new UnionNode(pool, tnode, descs));
}
return Status::OK();

case TPlanNodeType::INTERSECT_NODE:
Expand Down Expand Up @@ -624,4 +652,8 @@ Status ExecNode::QueryMaintenance(RuntimeState* state, const std::string& msg) {
return state->check_query_state(msg);
}

Status ExecNode::get_next(RuntimeState* state, vectorized::Block* block, bool* eos) {
return Status::NotSupported("Not Implemented get block");
}

} // namespace doris
7 changes: 6 additions & 1 deletion be/src/exec/exec_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
#include "util/uid_util.h" // for print_id

namespace doris {

class Expr;
class ExprContext;
class ObjectPool;
Expand All @@ -46,6 +45,11 @@ class TupleRow;
class DataSink;
class MemTracker;

namespace vectorized {
class Block;
class VExpr;
}

using std::string;
using std::stringstream;
using std::vector;
Expand Down Expand Up @@ -97,6 +101,7 @@ class ExecNode {
// Caller must not be holding any io buffers. This will cause deadlock.
// TODO: AggregationNode and HashJoinNode cannot be "re-opened" yet.
virtual Status get_next(RuntimeState* state, RowBatch* row_batch, bool* eos) = 0;
virtual Status get_next(RuntimeState* state, vectorized::Block* block, bool* eos);

// Resets the stream of row batches to be retrieved by subsequent GetNext() calls.
// Clears all internal state, returning this node to the state it was in after calling
Expand Down
Loading