Skip to content

Commit

Permalink
[New Featrue] Support Vectorization Execution Engine Interface For Do…
Browse files Browse the repository at this point in the history
…ris (#6329)

1. FE vectorized plan code
2. Function register vec function
3. Diff function nullable type
4. New thirdparty code and new thrift struct
  • Loading branch information
HappenLee authored Aug 11, 2021
1 parent 1a5b031 commit 9216735
Show file tree
Hide file tree
Showing 120 changed files with 2,767 additions and 1,009 deletions.
2 changes: 1 addition & 1 deletion be/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,7 @@ check_function_exists(sched_getcpu HAVE_SCHED_GETCPU)
# -DBOOST_DATE_TIME_POSIX_TIME_STD_CONFIG: enable nanosecond precision for boost
# -fno-omit-frame-pointers: Keep frame pointer for functions in register
set(CXX_COMMON_FLAGS "${CXX_COMMON_FLAGS} -Wall -Wno-sign-compare -Wno-unknown-pragmas -pthread -Werror=strict-aliasing")
set(CXX_COMMON_FLAGS "${CXX_COMMON_FLAGS} -fno-omit-frame-pointer")
set(CXX_COMMON_FLAGS "${CXX_COMMON_FLAGS} -fstrict-aliasing -fno-omit-frame-pointer -Werror=return-type")
set(CXX_COMMON_FLAGS "${CXX_COMMON_FLAGS} -std=gnu++17 -D__STDC_FORMAT_MACROS")
set(CXX_COMMON_FLAGS "${CXX_COMMON_FLAGS} -Wno-deprecated -Wno-vla")
set(CXX_COMMON_FLAGS "${CXX_COMMON_FLAGS} -DBOOST_DATE_TIME_POSIX_TIME_STD_CONFIG")
Expand Down
5 changes: 5 additions & 0 deletions be/src/common/compiler_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,12 @@
/// not a command. This should be used sparingly for cases when either the function
/// needs to be inlined for a specific reason or the compiler's heuristics make a bad
/// decision, e.g. not inlining a small function on a hot path.
#ifdef ALWAYS_INLINE
#undef ALWAYS_INLINE
#endif
#define ALWAYS_INLINE __attribute__((always_inline))
#define NO_INLINE __attribute__((__noinline__))
#define MAY_ALIAS __attribute__((__may_alias__))

#define ALIGN_CACHE_LINE __attribute__((aligned(CACHE_LINE_SIZE)))

Expand Down
1 change: 1 addition & 0 deletions be/src/common/logging.h
Original file line number Diff line number Diff line change
Expand Up @@ -85,4 +85,5 @@
DCHECK(a == b) << "[ " #a " = " << static_cast<int>(a) << " , " #b " = " \
<< static_cast<int>(b) << " ]"

#include <fmt/format.h>
#endif
2 changes: 2 additions & 0 deletions be/src/common/status.h
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,8 @@ class Status {
/// trailing message.
Status clone_and_append(const Slice& msg) const;

operator bool() { return this->ok(); }

private:
const char* copy_state(const char* state);

Expand Down
13 changes: 11 additions & 2 deletions be/src/exec/aggregation_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

#include <gperftools/profiler.h>
#include <math.h>
#include <thrift/protocol/TDebugProtocol.h>

#include <sstream>

Expand Down Expand Up @@ -223,6 +222,15 @@ Status AggregationNode::open(RuntimeState* state) {
}

Status AggregationNode::get_next(RuntimeState* state, RowBatch* row_batch, bool* eos) {
// 1. `!need_finalize` means this aggregation node not the level two aggregation node
// 2. `_singleton_output_tuple != nullptr` means is not group by
// 3. `child(0)->rows_returned() == 0` mean not data from child
// in level two aggregation node should return NULL result
// level one aggregation node set `eos = true` return directly
if (UNLIKELY(!_needs_finalize && _singleton_output_tuple != nullptr && child(0)->rows_returned() == 0)) {
*eos = true;
return Status::OK();
}
SCOPED_TIMER(_runtime_profile->total_time_counter());
RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::GETNEXT));
RETURN_IF_CANCELLED(state);
Expand Down Expand Up @@ -403,7 +411,8 @@ Tuple* AggregationNode::finalize_tuple(Tuple* tuple, MemPool* pool) {
dst = Tuple::create(_output_tuple_desc->byte_size(), pool);
}
if (_needs_finalize) {
AggFnEvaluator::finalize(_aggregate_evaluators, _agg_fn_ctxs, tuple, dst);
AggFnEvaluator::finalize(_aggregate_evaluators, _agg_fn_ctxs, tuple, dst,
_singleton_output_tuple != nullptr && child(0)->rows_returned() == 0);
} else {
AggFnEvaluator::serialize(_aggregate_evaluators, _agg_fn_ctxs, tuple);
}
Expand Down
2 changes: 1 addition & 1 deletion be/src/exec/broker_scan_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ BrokerScanNode::BrokerScanNode(ObjectPool* pool, const TPlanNode& tnode, const D
BrokerScanNode::~BrokerScanNode() {}

Status BrokerScanNode::init(const TPlanNode& tnode, RuntimeState* state) {
RETURN_IF_ERROR(ScanNode::init(tnode));
RETURN_IF_ERROR(ScanNode::init(tnode, state));
auto& broker_scan_node = tnode.broker_scan_node;

if (broker_scan_node.__isset.pre_filter_exprs) {
Expand Down
2 changes: 1 addition & 1 deletion be/src/exec/cross_join_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ Status CrossJoinNode::construct_build_side(RuntimeState* state) {
RETURN_IF_CANCELLED(state);
// TODO(zhaochun):
// RETURN_IF_ERROR(state->CheckQueryState());
bool eos = true;
bool eos = false;
RETURN_IF_ERROR(child(1)->get_next(state, batch, &eos));

// to prevent use too many memory
Expand Down
24 changes: 15 additions & 9 deletions be/src/exec/data_sink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ Status DataSink::create_data_sink(ObjectPool* pool, const TDataSink& thrift_sink
const std::vector<TExpr>& output_exprs,
const TPlanFragmentExecParams& params,
const RowDescriptor& row_desc,
bool is_vec,
boost::scoped_ptr<DataSink>* sink) {
DataSink* tmp_sink = NULL;

Expand All @@ -55,9 +56,12 @@ Status DataSink::create_data_sink(ObjectPool* pool, const TDataSink& thrift_sink
? params.send_query_statistics_with_every_batch
: false;
// TODO: figure out good buffer size based on size of output row
tmp_sink = new DataStreamSender(pool, params.sender_id, row_desc, thrift_sink.stream_sink,
params.destinations, 16 * 1024,
send_query_statistics_with_every_batch);
if (is_vec) {
} else {
tmp_sink = new DataStreamSender(pool, params.sender_id, row_desc, thrift_sink.stream_sink,
params.destinations, 16 * 1024,
send_query_statistics_with_every_batch);
}
// RETURN_IF_ERROR(sender->prepare(state->obj_pool(), thrift_sink.stream_sink));
sink->reset(tmp_sink);
break;
Expand All @@ -68,7 +72,10 @@ Status DataSink::create_data_sink(ObjectPool* pool, const TDataSink& thrift_sink
}

// TODO: figure out good buffer size based on size of output row
tmp_sink = new ResultSink(row_desc, output_exprs, thrift_sink.result_sink, 1024);
if (is_vec) {
} else {
tmp_sink = new ResultSink(row_desc, output_exprs, thrift_sink.result_sink, 1024);
}
sink->reset(tmp_sink);
break;
case TDataSinkType::MEMORY_SCRATCH_SINK:
Expand Down Expand Up @@ -98,8 +105,7 @@ Status DataSink::create_data_sink(ObjectPool* pool, const TDataSink& thrift_sink
if (!thrift_sink.__isset.odbc_table_sink) {
return Status::InternalError("Missing data odbc sink.");
}
OdbcTableSink* odbc_tbl_sink = new OdbcTableSink(pool,
row_desc, output_exprs);
OdbcTableSink* odbc_tbl_sink = new OdbcTableSink(pool, row_desc, output_exprs);
sink->reset(odbc_tbl_sink);
break;
}
Expand Down Expand Up @@ -158,9 +164,9 @@ Status DataSink::init(const TDataSink& thrift_sink) {
}

Status DataSink::prepare(RuntimeState* state) {
_expr_mem_tracker = MemTracker::CreateTracker(
-1, _name + ":Expr:" + std::to_string(state->load_job_id()),
state->instance_mem_tracker());
_expr_mem_tracker =
MemTracker::CreateTracker(-1, _name + ":Expr:" + std::to_string(state->load_job_id()),
state->instance_mem_tracker());
return Status::OK();
}

Expand Down
1 change: 1 addition & 0 deletions be/src/exec/data_sink.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ class DataSink {
const std::vector<TExpr>& output_exprs,
const TPlanFragmentExecParams& params,
const RowDescriptor& row_desc,
bool is_vec,
boost::scoped_ptr<DataSink>* sink);

// Returns the runtime profile for the sink.
Expand Down
62 changes: 47 additions & 15 deletions be/src/exec/exec_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -127,9 +127,7 @@ ExecNode::ExecNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl
_rows_returned_counter(NULL),
_rows_returned_rate(NULL),
_memory_used_counter(NULL),
_is_closed(false) {
init_runtime_profile(print_plan_node_type(tnode.node_type));
}
_is_closed(false) {}

ExecNode::~ExecNode() {}

Expand Down Expand Up @@ -159,7 +157,18 @@ void ExecNode::push_down_predicate(RuntimeState* state, std::list<ExprContext*>*
}

Status ExecNode::init(const TPlanNode& tnode, RuntimeState* state) {
std::string profile;
if (state && state->enable_vectorized_exec()) {
profile = "V" + print_plan_node_type(tnode.node_type);
} else {
profile = print_plan_node_type(tnode.node_type);
}
init_runtime_profile(profile);

if (tnode.__isset.vconjunct) {
}
RETURN_IF_ERROR(Expr::create_expr_trees(_pool, tnode.conjuncts, &_conjunct_ctxs));

return Status::OK();
}

Expand All @@ -178,11 +187,11 @@ Status ExecNode::prepare(RuntimeState* state) {
_expr_mem_tracker = MemTracker::CreateTracker(-1, "ExecNode:Exprs:" + _runtime_profile->name(),
_mem_tracker);
_expr_mem_pool.reset(new MemPool(_expr_mem_tracker.get()));
// TODO chenhao

RETURN_IF_ERROR(Expr::prepare(_conjunct_ctxs, state, row_desc(), expr_mem_tracker()));

// TODO(zc):
// AddExprCtxsToFree(_conjunct_ctxs);

for (int i = 0; i < _children.size(); ++i) {
RETURN_IF_ERROR(_children[i]->prepare(state));
}
Expand Down Expand Up @@ -362,22 +371,32 @@ Status ExecNode::create_node(RuntimeState* state, ObjectPool* pool, const TPlanN
return Status::OK();

case TPlanNodeType::OLAP_SCAN_NODE:
*node = pool->add(new OlapScanNode(pool, tnode, descs));
if (state->enable_vectorized_exec()) {
} else {
*node = pool->add(new OlapScanNode(pool, tnode, descs));
}
return Status::OK();

case TPlanNodeType::AGGREGATION_NODE:
if (config::enable_partitioned_aggregation) {
*node = pool->add(new PartitionedAggregationNode(pool, tnode, descs));
if (state->enable_vectorized_exec()) {
} else {
*node = pool->add(new AggregationNode(pool, tnode, descs));
if (config::enable_partitioned_aggregation) {
*node = pool->add(new PartitionedAggregationNode(pool, tnode, descs));
} else {
*node = pool->add(new AggregationNode(pool, tnode, descs));
}
}
return Status::OK();

case TPlanNodeType::HASH_JOIN_NODE:
*node = pool->add(new HashJoinNode(pool, tnode, descs));
return Status::OK();

case TPlanNodeType::CROSS_JOIN_NODE:
*node = pool->add(new CrossJoinNode(pool, tnode, descs));
if (state->enable_vectorized_exec()) {
} else {
*node = pool->add(new CrossJoinNode(pool, tnode, descs));
}
return Status::OK();

case TPlanNodeType::MERGE_JOIN_NODE:
Expand All @@ -389,7 +408,10 @@ Status ExecNode::create_node(RuntimeState* state, ObjectPool* pool, const TPlanN
return Status::OK();

case TPlanNodeType::EXCHANGE_NODE:
*node = pool->add(new ExchangeNode(pool, tnode, descs));
if (state->enable_vectorized_exec()) {
} else {
*node = pool->add(new ExchangeNode(pool, tnode, descs));
}
return Status::OK();

case TPlanNodeType::SELECT_NODE:
Expand All @@ -401,10 +423,13 @@ Status ExecNode::create_node(RuntimeState* state, ObjectPool* pool, const TPlanN
return Status::OK();

case TPlanNodeType::SORT_NODE:
if (tnode.sort_node.use_top_n) {
*node = pool->add(new TopNNode(pool, tnode, descs));
if (state->enable_vectorized_exec()) {
} else {
*node = pool->add(new SpillSortNode(pool, tnode, descs));
if (tnode.sort_node.use_top_n) {
*node = pool->add(new TopNNode(pool, tnode, descs));
} else {
*node = pool->add(new SpillSortNode(pool, tnode, descs));
}
}

return Status::OK();
Expand All @@ -417,7 +442,10 @@ Status ExecNode::create_node(RuntimeState* state, ObjectPool* pool, const TPlanN
return Status::OK();

case TPlanNodeType::UNION_NODE:
*node = pool->add(new UnionNode(pool, tnode, descs));
if (state->enable_vectorized_exec()) {
} else {
*node = pool->add(new UnionNode(pool, tnode, descs));
}
return Status::OK();

case TPlanNodeType::INTERSECT_NODE:
Expand Down Expand Up @@ -624,4 +652,8 @@ Status ExecNode::QueryMaintenance(RuntimeState* state, const std::string& msg) {
return state->check_query_state(msg);
}

Status ExecNode::get_next(RuntimeState* state, vectorized::Block* block, bool* eos) {
return Status::NotSupported("Not Implemented get block");
}

} // namespace doris
7 changes: 6 additions & 1 deletion be/src/exec/exec_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
#include "util/uid_util.h" // for print_id

namespace doris {

class Expr;
class ExprContext;
class ObjectPool;
Expand All @@ -46,6 +45,11 @@ class TupleRow;
class DataSink;
class MemTracker;

namespace vectorized {
class Block;
class VExpr;
}

using std::string;
using std::stringstream;
using std::vector;
Expand Down Expand Up @@ -97,6 +101,7 @@ class ExecNode {
// Caller must not be holding any io buffers. This will cause deadlock.
// TODO: AggregationNode and HashJoinNode cannot be "re-opened" yet.
virtual Status get_next(RuntimeState* state, RowBatch* row_batch, bool* eos) = 0;
virtual Status get_next(RuntimeState* state, vectorized::Block* block, bool* eos);

// Resets the stream of row batches to be retrieved by subsequent GetNext() calls.
// Clears all internal state, returning this node to the state it was in after calling
Expand Down
Loading

0 comments on commit 9216735

Please sign in to comment.