Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[performance](variant) support topn 2phase read for variant column #28318

Merged
merged 7 commits into from
Dec 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion be/src/olap/rowset/segment_v2/hierarchical_data_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ class ExtractReader : public ColumnIterator {
private:
Status extract_to(vectorized::MutableColumnPtr& dst, size_t nrows);

const TabletColumn& _col;
TabletColumn _col;
// may shared among different column iterators
std::unique_ptr<StreamReader> _root_reader;
};
Expand Down
67 changes: 62 additions & 5 deletions be/src/olap/rowset/segment_v2/segment.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,12 @@
#include "util/slice.h" // Slice
#include "vec/columns/column.h"
#include "vec/common/string_ref.h"
#include "vec/core/field.h"
#include "vec/data_types/data_type.h"
#include "vec/data_types/data_type_factory.hpp"
#include "vec/data_types/data_type_nullable.h"
#include "vec/data_types/data_type_object.h"
#include "vec/json/path_in_data.h"
#include "vec/olap/vgeneric_iterators.h"

namespace doris {
Expand Down Expand Up @@ -332,17 +334,18 @@ Status Segment::_load_index_impl() {

// Return the storage datatype of related column to field.
// Return nullptr meaning no such storage infomation for this column
vectorized::DataTypePtr Segment::get_data_type_of(const Field& field, bool ignore_children) const {
vectorized::DataTypePtr Segment::get_data_type_of(vectorized::PathInData path, bool is_nullable,
bool ignore_children) const {
// Path has higher priority
if (!field.path().empty()) {
auto node = _sub_column_tree.find_leaf(field.path());
if (!path.empty()) {
auto node = _sub_column_tree.find_leaf(path);
if (node) {
if (ignore_children || node->children.empty()) {
return node->data.file_column_type;
}
}
// it contains children or column missing in storage, so treat it as variant
return field.is_nullable()
return is_nullable
? vectorized::make_nullable(std::make_shared<vectorized::DataTypeObject>())
: std::make_shared<vectorized::DataTypeObject>();
}
Expand Down Expand Up @@ -686,7 +689,8 @@ Status Segment::read_key_by_rowid(uint32_t row_id, std::string* key) {

bool Segment::same_with_storage_type(int32_t cid, const Schema& schema,
bool ignore_children) const {
auto file_column_type = get_data_type_of(*schema.column(cid), ignore_children);
auto file_column_type = get_data_type_of(schema.column(cid)->path(),
schema.column(cid)->is_nullable(), ignore_children);
auto expected_type = Schema::get_data_type_ptr(*schema.column(cid));
#ifndef NDEBUG
if (file_column_type && !file_column_type->equals(*expected_type)) {
Expand All @@ -700,5 +704,58 @@ bool Segment::same_with_storage_type(int32_t cid, const Schema& schema,
return same;
}

Status Segment::seek_and_read_by_rowid(const TabletSchema& schema, SlotDescriptor* slot,
uint32_t row_id, vectorized::MutableColumnPtr& result,
OlapReaderStatistics& stats,
std::unique_ptr<ColumnIterator>& iterator_hint) {
StorageReadOptions storage_read_opt;
storage_read_opt.io_ctx.reader_type = ReaderType::READER_QUERY;
segment_v2::ColumnIteratorOptions opt {
.use_page_cache = !config::disable_storage_page_cache,
.file_reader = file_reader().get(),
.stats = &stats,
.io_ctx = io::IOContext {.reader_type = ReaderType::READER_QUERY},
};
std::vector<segment_v2::rowid_t> single_row_loc {row_id};
if (!slot->column_paths().empty()) {
vectorized::PathInData path(schema.column_by_uid(slot->col_unique_id()).name_lower_case(),
slot->column_paths());
auto storage_type = get_data_type_of(path, slot->is_nullable(), false);
vectorized::MutableColumnPtr file_storage_column = storage_type->create_column();
DCHECK(storage_type != nullptr);
TabletColumn column = TabletColumn::create_materialized_variant_column(
schema.column_by_uid(slot->col_unique_id()).name_lower_case(), slot->column_paths(),
slot->col_unique_id());
if (iterator_hint == nullptr) {
RETURN_IF_ERROR(new_column_iterator(column, &iterator_hint, &storage_read_opt));
RETURN_IF_ERROR(iterator_hint->init(opt));
}
RETURN_IF_ERROR(
iterator_hint->read_by_rowids(single_row_loc.data(), 1, file_storage_column));
// iterator_hint.reset(nullptr);
// Get it's inner field, for JSONB case
vectorized::Field field = remove_nullable(storage_type)->get_default();
file_storage_column->get(0, field);
result->insert(field);
} else {
int index = (slot->col_unique_id() >= 0) ? schema.field_index(slot->col_unique_id())
: schema.field_index(slot->col_name());
if (index < 0) {
std::stringstream ss;
ss << "field name is invalid. field=" << slot->col_name()
<< ", field_name_to_index=" << schema.get_all_field_names();
return Status::InternalError(ss.str());
}
storage_read_opt.io_ctx.reader_type = ReaderType::READER_QUERY;
if (iterator_hint == nullptr) {
RETURN_IF_ERROR(
new_column_iterator(schema.column(index), &iterator_hint, &storage_read_opt));
RETURN_IF_ERROR(iterator_hint->init(opt));
}
RETURN_IF_ERROR(iterator_hint->read_by_rowids(single_row_loc.data(), 1, result));
}
return Status::OK();
}

} // namespace segment_v2
} // namespace doris
14 changes: 11 additions & 3 deletions be/src/olap/rowset/segment_v2/segment.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,14 @@
#include "olap/rowset/segment_v2/page_handle.h"
#include "olap/schema.h"
#include "olap/tablet_schema.h"
#include "runtime/descriptors.h"
#include "util/once.h"
#include "util/slice.h"
#include "vec/columns/column.h"
#include "vec/columns/subcolumn_tree.h"
#include "vec/data_types/data_type.h"
#include "vec/data_types/data_type_nullable.h"
#include "vec/json/path_in_data.h"

namespace doris {
namespace vectorized {
Expand Down Expand Up @@ -123,6 +126,10 @@ class Segment : public std::enable_shared_from_this<Segment> {

Status read_key_by_rowid(uint32_t row_id, std::string* key);

Status seek_and_read_by_rowid(const TabletSchema& schema, SlotDescriptor* slot, uint32_t row_id,
vectorized::MutableColumnPtr& result, OlapReaderStatistics& stats,
std::unique_ptr<ColumnIterator>& iterator_hint);

Status load_index();

Status load_pk_index_and_bf();
Expand All @@ -146,7 +153,8 @@ class Segment : public std::enable_shared_from_this<Segment> {
// ignore_chidren set to false will treat field as variant
// when it contains children with field paths.
// nullptr will returned if storage type does not contains such column
std::shared_ptr<const vectorized::IDataType> get_data_type_of(const Field& filed,
std::shared_ptr<const vectorized::IDataType> get_data_type_of(vectorized::PathInData path,
bool is_nullable,
bool ignore_children) const;

// Check is schema read type equals storage column type
Expand All @@ -157,8 +165,8 @@ class Segment : public std::enable_shared_from_this<Segment> {
bool can_apply_predicate_safely(int cid, Predicate* pred, const Schema& schema,
ReaderType read_type) const {
const Field* col = schema.column(cid);
vectorized::DataTypePtr storage_column_type =
get_data_type_of(*col, read_type != ReaderType::READER_QUERY);
vectorized::DataTypePtr storage_column_type = get_data_type_of(
col->path(), col->is_nullable(), read_type != ReaderType::READER_QUERY);
if (storage_column_type == nullptr) {
// Default column iterator
return true;
Expand Down
3 changes: 2 additions & 1 deletion be/src/olap/rowset/segment_v2/segment_iterator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,8 @@ Status SegmentIterator::_init_impl(const StorageReadOptions& opts) {
const Field* col = _schema->column(i);
if (col) {
auto storage_type = _segment->get_data_type_of(
*col, _opts.io_ctx.reader_type != ReaderType::READER_QUERY);
col->path(), col->is_nullable(),
_opts.io_ctx.reader_type != ReaderType::READER_QUERY);
if (storage_type == nullptr) {
storage_type = vectorized::DataTypeFactory::instance().create_data_type(*col);
}
Expand Down
4 changes: 2 additions & 2 deletions be/src/olap/rowset/segment_v2/segment_iterator.h
Original file line number Diff line number Diff line change
Expand Up @@ -251,8 +251,8 @@ class SegmentIterator : public RowwiseIterator {
if (block_cid >= block->columns()) {
continue;
}
vectorized::DataTypePtr storage_type =
_segment->get_data_type_of(*_schema->column(cid), false);
vectorized::DataTypePtr storage_type = _segment->get_data_type_of(
_schema->column(cid)->path(), _schema->column(cid)->is_nullable(), false);
if (storage_type && !storage_type->equals(*block->get_by_position(block_cid).type)) {
// Do additional cast
vectorized::MutableColumnPtr tmp = storage_type->create_column();
Expand Down
14 changes: 14 additions & 0 deletions be/src/olap/tablet_schema.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -554,6 +554,20 @@ void TabletColumn::init_from_pb(const ColumnPB& column) {
}
}

TabletColumn TabletColumn::create_materialized_variant_column(const std::string& root,
const std::vector<std::string>& paths,
int32_t parent_unique_id) {
TabletColumn subcol;
subcol.set_type(FieldType::OLAP_FIELD_TYPE_VARIANT);
subcol.set_is_nullable(true);
subcol.set_unique_id(-1);
subcol.set_parent_unique_id(parent_unique_id);
vectorized::PathInData path(root, paths);
subcol.set_path_info(path);
subcol.set_name(path.get_path());
return subcol;
}

void TabletColumn::to_schema_pb(ColumnPB* column) const {
column->set_unique_id(_unique_id);
column->set_name(_col_name);
Expand Down
6 changes: 6 additions & 0 deletions be/src/olap/tablet_schema.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
#include "gutil/stringprintf.h"
#include "olap/olap_common.h"
#include "runtime/define_primitive_type.h"
#include "runtime/descriptors.h"
#include "util/string_util.h"
#include "vec/aggregate_functions/aggregate_function.h"
#include "vec/common/string_utils/string_utils.h"
Expand Down Expand Up @@ -91,6 +92,11 @@ class TabletColumn {
_type == FieldType::OLAP_FIELD_TYPE_QUANTILE_STATE ||
_type == FieldType::OLAP_FIELD_TYPE_AGG_STATE;
}
// Such columns are not exist in frontend schema info, so we need to
// add them into tablet_schema for later column indexing.
static TabletColumn create_materialized_variant_column(const std::string& root,
const std::vector<std::string>& paths,
int32_t parent_unique_id);
bool has_default_value() const { return _has_default_value; }
std::string default_value() const { return _default_value; }
size_t length() const { return _length; }
Expand Down
4 changes: 4 additions & 0 deletions be/src/runtime/descriptors.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ SlotDescriptor::SlotDescriptor(const PSlotDescriptor& pdesc)
_is_materialized(pdesc.is_materialized()),
_is_key(pdesc.is_key()),
_need_materialize(true),
_column_paths(pdesc.column_paths().begin(), pdesc.column_paths().end()),
_is_auto_increment(pdesc.is_auto_increment()) {}

void SlotDescriptor::to_protobuf(PSlotDescriptor* pslot) const {
Expand All @@ -103,6 +104,9 @@ void SlotDescriptor::to_protobuf(PSlotDescriptor* pslot) const {
pslot->set_is_key(_is_key);
pslot->set_is_auto_increment(_is_auto_increment);
pslot->set_col_type(_col_type);
for (const std::string& path : _column_paths) {
pslot->add_column_paths(path);
}
}

vectorized::MutableColumnPtr SlotDescriptor::get_empty_mutable_column() const {
Expand Down
Loading
Loading