Skip to content

Commit

Permalink
[performance](variant) support topn 2phase read for variant column (a…
Browse files Browse the repository at this point in the history
…pache#28318)

 [performance](variant) support topn 2phase read for variant column
  • Loading branch information
eldenmoon authored and stephen committed Dec 28, 2023
1 parent 9cfeef6 commit fef8516
Show file tree
Hide file tree
Showing 20 changed files with 261 additions and 110 deletions.
2 changes: 1 addition & 1 deletion be/src/olap/rowset/segment_v2/hierarchical_data_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ class ExtractReader : public ColumnIterator {
private:
Status extract_to(vectorized::MutableColumnPtr& dst, size_t nrows);

const TabletColumn& _col;
TabletColumn _col;
// may shared among different column iterators
std::unique_ptr<StreamReader> _root_reader;
};
Expand Down
67 changes: 62 additions & 5 deletions be/src/olap/rowset/segment_v2/segment.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,12 @@
#include "util/slice.h" // Slice
#include "vec/columns/column.h"
#include "vec/common/string_ref.h"
#include "vec/core/field.h"
#include "vec/data_types/data_type.h"
#include "vec/data_types/data_type_factory.hpp"
#include "vec/data_types/data_type_nullable.h"
#include "vec/data_types/data_type_object.h"
#include "vec/json/path_in_data.h"
#include "vec/olap/vgeneric_iterators.h"

namespace doris {
Expand Down Expand Up @@ -332,17 +334,18 @@ Status Segment::_load_index_impl() {

// Return the storage datatype of related column to field.
// Return nullptr meaning no such storage infomation for this column
vectorized::DataTypePtr Segment::get_data_type_of(const Field& field, bool ignore_children) const {
vectorized::DataTypePtr Segment::get_data_type_of(vectorized::PathInData path, bool is_nullable,
bool ignore_children) const {
// Path has higher priority
if (!field.path().empty()) {
auto node = _sub_column_tree.find_leaf(field.path());
if (!path.empty()) {
auto node = _sub_column_tree.find_leaf(path);
if (node) {
if (ignore_children || node->children.empty()) {
return node->data.file_column_type;
}
}
// it contains children or column missing in storage, so treat it as variant
return field.is_nullable()
return is_nullable
? vectorized::make_nullable(std::make_shared<vectorized::DataTypeObject>())
: std::make_shared<vectorized::DataTypeObject>();
}
Expand Down Expand Up @@ -686,7 +689,8 @@ Status Segment::read_key_by_rowid(uint32_t row_id, std::string* key) {

bool Segment::same_with_storage_type(int32_t cid, const Schema& schema,
bool ignore_children) const {
auto file_column_type = get_data_type_of(*schema.column(cid), ignore_children);
auto file_column_type = get_data_type_of(schema.column(cid)->path(),
schema.column(cid)->is_nullable(), ignore_children);
auto expected_type = Schema::get_data_type_ptr(*schema.column(cid));
#ifndef NDEBUG
if (file_column_type && !file_column_type->equals(*expected_type)) {
Expand All @@ -700,5 +704,58 @@ bool Segment::same_with_storage_type(int32_t cid, const Schema& schema,
return same;
}

Status Segment::seek_and_read_by_rowid(const TabletSchema& schema, SlotDescriptor* slot,
uint32_t row_id, vectorized::MutableColumnPtr& result,
OlapReaderStatistics& stats,
std::unique_ptr<ColumnIterator>& iterator_hint) {
StorageReadOptions storage_read_opt;
storage_read_opt.io_ctx.reader_type = ReaderType::READER_QUERY;
segment_v2::ColumnIteratorOptions opt {
.use_page_cache = !config::disable_storage_page_cache,
.file_reader = file_reader().get(),
.stats = &stats,
.io_ctx = io::IOContext {.reader_type = ReaderType::READER_QUERY},
};
std::vector<segment_v2::rowid_t> single_row_loc {row_id};
if (!slot->column_paths().empty()) {
vectorized::PathInData path(schema.column_by_uid(slot->col_unique_id()).name_lower_case(),
slot->column_paths());
auto storage_type = get_data_type_of(path, slot->is_nullable(), false);
vectorized::MutableColumnPtr file_storage_column = storage_type->create_column();
DCHECK(storage_type != nullptr);
TabletColumn column = TabletColumn::create_materialized_variant_column(
schema.column_by_uid(slot->col_unique_id()).name_lower_case(), slot->column_paths(),
slot->col_unique_id());
if (iterator_hint == nullptr) {
RETURN_IF_ERROR(new_column_iterator(column, &iterator_hint, &storage_read_opt));
RETURN_IF_ERROR(iterator_hint->init(opt));
}
RETURN_IF_ERROR(
iterator_hint->read_by_rowids(single_row_loc.data(), 1, file_storage_column));
// iterator_hint.reset(nullptr);
// Get it's inner field, for JSONB case
vectorized::Field field = remove_nullable(storage_type)->get_default();
file_storage_column->get(0, field);
result->insert(field);
} else {
int index = (slot->col_unique_id() >= 0) ? schema.field_index(slot->col_unique_id())
: schema.field_index(slot->col_name());
if (index < 0) {
std::stringstream ss;
ss << "field name is invalid. field=" << slot->col_name()
<< ", field_name_to_index=" << schema.get_all_field_names();
return Status::InternalError(ss.str());
}
storage_read_opt.io_ctx.reader_type = ReaderType::READER_QUERY;
if (iterator_hint == nullptr) {
RETURN_IF_ERROR(
new_column_iterator(schema.column(index), &iterator_hint, &storage_read_opt));
RETURN_IF_ERROR(iterator_hint->init(opt));
}
RETURN_IF_ERROR(iterator_hint->read_by_rowids(single_row_loc.data(), 1, result));
}
return Status::OK();
}

} // namespace segment_v2
} // namespace doris
14 changes: 11 additions & 3 deletions be/src/olap/rowset/segment_v2/segment.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,14 @@
#include "olap/rowset/segment_v2/page_handle.h"
#include "olap/schema.h"
#include "olap/tablet_schema.h"
#include "runtime/descriptors.h"
#include "util/once.h"
#include "util/slice.h"
#include "vec/columns/column.h"
#include "vec/columns/subcolumn_tree.h"
#include "vec/data_types/data_type.h"
#include "vec/data_types/data_type_nullable.h"
#include "vec/json/path_in_data.h"

namespace doris {
namespace vectorized {
Expand Down Expand Up @@ -123,6 +126,10 @@ class Segment : public std::enable_shared_from_this<Segment> {

Status read_key_by_rowid(uint32_t row_id, std::string* key);

Status seek_and_read_by_rowid(const TabletSchema& schema, SlotDescriptor* slot, uint32_t row_id,
vectorized::MutableColumnPtr& result, OlapReaderStatistics& stats,
std::unique_ptr<ColumnIterator>& iterator_hint);

Status load_index();

Status load_pk_index_and_bf();
Expand All @@ -146,7 +153,8 @@ class Segment : public std::enable_shared_from_this<Segment> {
// ignore_chidren set to false will treat field as variant
// when it contains children with field paths.
// nullptr will returned if storage type does not contains such column
std::shared_ptr<const vectorized::IDataType> get_data_type_of(const Field& filed,
std::shared_ptr<const vectorized::IDataType> get_data_type_of(vectorized::PathInData path,
bool is_nullable,
bool ignore_children) const;

// Check is schema read type equals storage column type
Expand All @@ -157,8 +165,8 @@ class Segment : public std::enable_shared_from_this<Segment> {
bool can_apply_predicate_safely(int cid, Predicate* pred, const Schema& schema,
ReaderType read_type) const {
const Field* col = schema.column(cid);
vectorized::DataTypePtr storage_column_type =
get_data_type_of(*col, read_type != ReaderType::READER_QUERY);
vectorized::DataTypePtr storage_column_type = get_data_type_of(
col->path(), col->is_nullable(), read_type != ReaderType::READER_QUERY);
if (storage_column_type == nullptr) {
// Default column iterator
return true;
Expand Down
3 changes: 2 additions & 1 deletion be/src/olap/rowset/segment_v2/segment_iterator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,8 @@ Status SegmentIterator::_init_impl(const StorageReadOptions& opts) {
const Field* col = _schema->column(i);
if (col) {
auto storage_type = _segment->get_data_type_of(
*col, _opts.io_ctx.reader_type != ReaderType::READER_QUERY);
col->path(), col->is_nullable(),
_opts.io_ctx.reader_type != ReaderType::READER_QUERY);
if (storage_type == nullptr) {
storage_type = vectorized::DataTypeFactory::instance().create_data_type(*col);
}
Expand Down
4 changes: 2 additions & 2 deletions be/src/olap/rowset/segment_v2/segment_iterator.h
Original file line number Diff line number Diff line change
Expand Up @@ -251,8 +251,8 @@ class SegmentIterator : public RowwiseIterator {
if (block_cid >= block->columns()) {
continue;
}
vectorized::DataTypePtr storage_type =
_segment->get_data_type_of(*_schema->column(cid), false);
vectorized::DataTypePtr storage_type = _segment->get_data_type_of(
_schema->column(cid)->path(), _schema->column(cid)->is_nullable(), false);
if (storage_type && !storage_type->equals(*block->get_by_position(block_cid).type)) {
// Do additional cast
vectorized::MutableColumnPtr tmp = storage_type->create_column();
Expand Down
14 changes: 14 additions & 0 deletions be/src/olap/tablet_schema.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -554,6 +554,20 @@ void TabletColumn::init_from_pb(const ColumnPB& column) {
}
}

TabletColumn TabletColumn::create_materialized_variant_column(const std::string& root,
const std::vector<std::string>& paths,
int32_t parent_unique_id) {
TabletColumn subcol;
subcol.set_type(FieldType::OLAP_FIELD_TYPE_VARIANT);
subcol.set_is_nullable(true);
subcol.set_unique_id(-1);
subcol.set_parent_unique_id(parent_unique_id);
vectorized::PathInData path(root, paths);
subcol.set_path_info(path);
subcol.set_name(path.get_path());
return subcol;
}

void TabletColumn::to_schema_pb(ColumnPB* column) const {
column->set_unique_id(_unique_id);
column->set_name(_col_name);
Expand Down
6 changes: 6 additions & 0 deletions be/src/olap/tablet_schema.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
#include "gutil/stringprintf.h"
#include "olap/olap_common.h"
#include "runtime/define_primitive_type.h"
#include "runtime/descriptors.h"
#include "util/string_util.h"
#include "vec/aggregate_functions/aggregate_function.h"
#include "vec/common/string_utils/string_utils.h"
Expand Down Expand Up @@ -91,6 +92,11 @@ class TabletColumn {
_type == FieldType::OLAP_FIELD_TYPE_QUANTILE_STATE ||
_type == FieldType::OLAP_FIELD_TYPE_AGG_STATE;
}
// Such columns are not exist in frontend schema info, so we need to
// add them into tablet_schema for later column indexing.
static TabletColumn create_materialized_variant_column(const std::string& root,
const std::vector<std::string>& paths,
int32_t parent_unique_id);
bool has_default_value() const { return _has_default_value; }
std::string default_value() const { return _default_value; }
size_t length() const { return _length; }
Expand Down
4 changes: 4 additions & 0 deletions be/src/runtime/descriptors.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ SlotDescriptor::SlotDescriptor(const PSlotDescriptor& pdesc)
_is_materialized(pdesc.is_materialized()),
_is_key(pdesc.is_key()),
_need_materialize(true),
_column_paths(pdesc.column_paths().begin(), pdesc.column_paths().end()),
_is_auto_increment(pdesc.is_auto_increment()) {}

void SlotDescriptor::to_protobuf(PSlotDescriptor* pslot) const {
Expand All @@ -103,6 +104,9 @@ void SlotDescriptor::to_protobuf(PSlotDescriptor* pslot) const {
pslot->set_is_key(_is_key);
pslot->set_is_auto_increment(_is_auto_increment);
pslot->set_col_type(_col_type);
for (const std::string& path : _column_paths) {
pslot->add_column_paths(path);
}
}

vectorized::MutableColumnPtr SlotDescriptor::get_empty_mutable_column() const {
Expand Down
Loading

0 comments on commit fef8516

Please sign in to comment.