Skip to content

Commit

Permalink
[fix](inverted index) fix multi match result error (#38931)
Browse files Browse the repository at this point in the history
1. multi_match result merging logic is incorrect
  • Loading branch information
zzzxl1993 authored Aug 7, 2024
1 parent 415056b commit 0c8fd93
Show file tree
Hide file tree
Showing 10 changed files with 187 additions and 36 deletions.
14 changes: 6 additions & 8 deletions be/src/olap/rowset/segment_v2/segment_iterator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1476,13 +1476,12 @@ Status SegmentIterator::_init_inverted_index_iterators() {
}

Status SegmentIterator::_init_inverted_index_iterators(ColumnId cid) {
std::lock_guard lock(_idx_init_lock);
if (_inverted_index_iterators[cid] == nullptr) {
return _init_single_inverted_index_iterator.call([&] {
return _segment->new_inverted_index_iterator(
_opts.tablet_schema->column(cid),
_segment->_tablet_schema->get_inverted_index(_opts.tablet_schema->column(cid)),
_opts, &_inverted_index_iterators[cid]);
});
return _segment->new_inverted_index_iterator(
_opts.tablet_schema->column(cid),
_segment->_tablet_schema->get_inverted_index(_opts.tablet_schema->column(cid)),
_opts, &_inverted_index_iterators[cid]);
}
return Status::OK();
}
Expand Down Expand Up @@ -3066,9 +3065,8 @@ Status SegmentIterator::execute_func_expr(const vectorized::VExprSPtr& expr,
params._unique_id = _schema->unique_id(slot_expr->column_id());
params._column_name = _opts.tablet_schema->column(params._column_id).name();
params._segment_iterator = this;
params.result = result;

return expr->eval_inverted_index(expr_ctx.get(), params);
return expr->eval_inverted_index(expr_ctx.get(), params, result);
}

} // namespace segment_v2
Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/rowset/segment_v2/segment_iterator.h
Original file line number Diff line number Diff line change
Expand Up @@ -532,7 +532,7 @@ class SegmentIterator : public RowwiseIterator {
std::unordered_map<int, std::unordered_map<std::string, bool>>
_column_predicate_inverted_index_status;

DorisCallOnce<Status> _init_single_inverted_index_iterator;
std::mutex _idx_init_lock;
};

} // namespace segment_v2
Expand Down
5 changes: 3 additions & 2 deletions be/src/vec/exprs/vectorized_fn_call.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -263,8 +263,9 @@ bool VectorizedFnCall::can_fast_execute() const {
}

Status VectorizedFnCall::eval_inverted_index(VExprContext* context,
segment_v2::FuncExprParams& params) {
return _function->eval_inverted_index(context->fn_context(_fn_context_index), params);
segment_v2::FuncExprParams& params,
std::shared_ptr<roaring::Roaring>& result) {
return _function->eval_inverted_index(context->fn_context(_fn_context_index), params, result);
}

bool VectorizedFnCall::equals(const VExpr& other) {
Expand Down
3 changes: 2 additions & 1 deletion be/src/vec/exprs/vectorized_fn_call.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,8 @@ class VectorizedFnCall : public VExpr {

bool can_push_down_to_index() const override;
bool can_fast_execute() const override;
Status eval_inverted_index(VExprContext* context, segment_v2::FuncExprParams& params) override;
Status eval_inverted_index(VExprContext* context, segment_v2::FuncExprParams& params,
std::shared_ptr<roaring::Roaring>& result) override;
bool equals(const VExpr& other) override;

protected:
Expand Down
3 changes: 2 additions & 1 deletion be/src/vec/exprs/vexpr.h
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,8 @@ class VExpr {

virtual bool can_push_down_to_index() const { return false; }
virtual bool can_fast_execute() const { return false; }
virtual Status eval_inverted_index(VExprContext* context, segment_v2::FuncExprParams& params) {
virtual Status eval_inverted_index(VExprContext* context, segment_v2::FuncExprParams& params,
std::shared_ptr<roaring::Roaring>& result) {
return Status::NotSupported("Not supported execute_with_inverted_index");
}
virtual bool equals(const VExpr& other);
Expand Down
10 changes: 5 additions & 5 deletions be/src/vec/functions/function.h
Original file line number Diff line number Diff line change
Expand Up @@ -235,8 +235,8 @@ class IFunctionBase {
}

virtual bool can_push_down_to_index() const { return false; }
virtual Status eval_inverted_index(FunctionContext* context,
segment_v2::FuncExprParams& params) {
virtual Status eval_inverted_index(FunctionContext* context, segment_v2::FuncExprParams& params,
std::shared_ptr<roaring::Roaring>& result) {
return Status::NotSupported("eval_inverted_index is not supported in function: ",
get_name());
}
Expand Down Expand Up @@ -543,9 +543,9 @@ class DefaultFunction final : public IFunctionBase {
}

bool can_push_down_to_index() const override { return function->can_push_down_to_index(); }
Status eval_inverted_index(FunctionContext* context,
segment_v2::FuncExprParams& params) override {
return function->eval_inverted_index(context, params);
Status eval_inverted_index(FunctionContext* context, segment_v2::FuncExprParams& params,
std::shared_ptr<roaring::Roaring>& result) override {
return function->eval_inverted_index(context, params, result);
}

private:
Expand Down
33 changes: 17 additions & 16 deletions be/src/vec/functions/function_multi_match.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,11 @@ Status FunctionMultiMatch::open(FunctionContext* context,
field_names_str.end());
std::vector<std::string> field_names;
boost::split(field_names, field_names_str, boost::algorithm::is_any_of(","));
state->fields.insert(field_names.begin(), field_names.end());
for (const auto& field_name : field_names) {
if (!field_name.empty()) {
state->fields.insert(field_name);
}
}
} break;
case 2:
state->type = const_data.to_string();
Expand All @@ -93,7 +97,8 @@ Status FunctionMultiMatch::open(FunctionContext* context,
}

Status FunctionMultiMatch::eval_inverted_index(FunctionContext* context,
segment_v2::FuncExprParams& params) {
segment_v2::FuncExprParams& params,
std::shared_ptr<roaring::Roaring>& result) {
auto* match_param = reinterpret_cast<MatchParam*>(
context->get_function_state(FunctionContext::FRAGMENT_LOCAL));
if (match_param == nullptr) {
Expand All @@ -106,7 +111,6 @@ Status FunctionMultiMatch::eval_inverted_index(FunctionContext* context,
const auto& tablet_schema = opts.tablet_schema;

std::vector<ColumnId> columns_ids;

for (const auto& column_name : match_param->fields) {
auto cid = tablet_schema->field_index(column_name);
if (cid < 0) {
Expand Down Expand Up @@ -148,34 +152,31 @@ Status FunctionMultiMatch::eval_inverted_index(FunctionContext* context,
auto* cache = InvertedIndexQueryCache::instance();
InvertedIndexQueryCacheHandle cache_handler;
if (cache->lookup(cache_key, &cache_handler)) {
params.result = cache_handler.get_bitmap();
result = cache_handler.get_bitmap();
return Status::OK();
}

// search
bool first = true;
for (const auto& column_name : match_param->fields) {
auto cid = tablet_schema->field_index(column_name);
const auto& column = *DORIS_TRY(tablet_schema->column(column_name));

auto& index_iterator = segment_iterator->inverted_index_iterators()[cid];
if (!index_iterator) {
RETURN_IF_ERROR(segment_iterator->_init_inverted_index_iterators(cid));
}
const auto& index_reader = index_iterator->reader();

auto result = std::make_shared<roaring::Roaring>();
RETURN_IF_ERROR(index_reader->query(opts.stats, opts.runtime_state, column_name,
match_param->query.data(), query_type, result));
if (first) {
(*params.result).swap(*result);
first = false;
} else {
(*params.result) |= (*result);
}
auto single_result = std::make_shared<roaring::Roaring>();
StringRef query_value(match_param->query.data());
RETURN_IF_ERROR(index_reader->query(opts.stats, opts.runtime_state,
std::to_string(column.unique_id()), &query_value,
query_type, single_result));
(*result) |= (*single_result);
}

params.result->runOptimize();
cache->insert(cache_key, params.result, &cache_handler);
result->runOptimize();
cache->insert(cache_key, result, &cache_handler);

return Status::OK();
}
Expand Down
4 changes: 2 additions & 2 deletions be/src/vec/functions/function_multi_match.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,8 @@ class FunctionMultiMatch : public IFunction {

bool can_push_down_to_index() const override { return true; }

Status eval_inverted_index(FunctionContext* context,
segment_v2::FuncExprParams& params) override;
Status eval_inverted_index(FunctionContext* context, segment_v2::FuncExprParams& params,
std::shared_ptr<roaring::Roaring>& result) override;
};

} // namespace doris::vectorized
25 changes: 25 additions & 0 deletions regression-test/data/inverted_index_p0/test_index_multi_match.out
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !sql --
178

-- !sql --
180

-- !sql --
859

-- !sql --
44

-- !sql --
178

-- !sql --
180

-- !sql --
859

-- !sql --
44

124 changes: 124 additions & 0 deletions regression-test/suites/inverted_index_p0/test_index_multi_match.groovy
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.


suite("test_index_multi_match", "p0"){
def indexTbName1 = "test_index_multi_match_1"
def indexTbName2 = "test_index_multi_match_2"

sql "DROP TABLE IF EXISTS ${indexTbName1}"
sql "DROP TABLE IF EXISTS ${indexTbName2}"

sql """
CREATE TABLE ${indexTbName1} (
`@timestamp` int(11) NULL COMMENT "",
`clientip` text NULL COMMENT "",
`request` text NULL COMMENT "",
`status` text NULL COMMENT "",
`size` text NULL COMMENT "",
INDEX clientip_idx (`clientip`) USING INVERTED PROPERTIES("parser" = "english", "support_phrase" = "true") COMMENT '',
INDEX request_idx (`request`) USING INVERTED PROPERTIES("parser" = "english", "support_phrase" = "true") COMMENT '',
INDEX status_idx (`status`) USING INVERTED PROPERTIES("parser" = "english", "support_phrase" = "true") COMMENT '',
INDEX size_idx (`size`) USING INVERTED PROPERTIES("parser" = "english", "support_phrase" = "true") COMMENT ''
) ENGINE=OLAP
DUPLICATE KEY(`@timestamp`)
COMMENT "OLAP"
DISTRIBUTED BY RANDOM BUCKETS 1
PROPERTIES (
"replication_allocation" = "tag.location.default: 1",
"disable_auto_compaction" = "true"
);
"""

sql """
CREATE TABLE ${indexTbName2} (
`@timestamp` int(11) NULL COMMENT "",
`clientip` text NULL COMMENT "",
`request` text NULL COMMENT "",
`status` text NULL COMMENT "",
`size` text NULL COMMENT "",
INDEX clientip_idx (`clientip`) USING INVERTED PROPERTIES("parser" = "english", "support_phrase" = "true") COMMENT '',
INDEX request_idx (`request`) USING INVERTED PROPERTIES("parser" = "english", "support_phrase" = "true") COMMENT '',
INDEX status_idx (`status`) USING INVERTED PROPERTIES("parser" = "english", "support_phrase" = "true") COMMENT '',
INDEX size_idx (`size`) USING INVERTED PROPERTIES("parser" = "english", "support_phrase" = "true") COMMENT ''
) ENGINE=OLAP
DUPLICATE KEY(`@timestamp`)
COMMENT "OLAP"
DISTRIBUTED BY RANDOM BUCKETS 1
PROPERTIES (
"replication_allocation" = "tag.location.default: 1",
"disable_auto_compaction" = "true"
);
"""

def load_httplogs_data = {table_name, label, read_flag, format_flag, file_name, ignore_failure=false,
expected_succ_rows = -1, load_to_single_tablet = 'true' ->

// load the json data
streamLoad {
table "${table_name}"

// set http request header params
set 'label', label + "_" + UUID.randomUUID().toString()
set 'read_json_by_line', read_flag
set 'format', format_flag
file file_name // import json file
time 10000 // limit inflight 10s
if (expected_succ_rows >= 0) {
set 'max_filter_ratio', '1'
}

// if declared a check callback, the default check condition will ignore.
// So you must check all condition
check { result, exception, startTime, endTime ->
if (ignore_failure && expected_succ_rows < 0) { return }
if (exception != null) {
throw exception
}
log.info("Stream load result: ${result}".toString())
def json = parseJson(result)
assertEquals("success", json.Status.toLowerCase())
if (expected_succ_rows >= 0) {
assertEquals(json.NumberLoadedRows, expected_succ_rows)
} else {
assertEquals(json.NumberTotalRows, json.NumberLoadedRows + json.NumberUnselectedRows)
assertTrue(json.NumberLoadedRows > 0 && json.LoadBytes > 0)
}
}
}
}

try {
load_httplogs_data.call(indexTbName1, 'test_index_multi_match_1', 'true', 'json', 'documents-1000.json')
load_httplogs_data.call(indexTbName2, 'test_index_multi_match_2', 'true', 'json', 'documents-1000.json')

sql "sync"

qt_sql """ select count() from ${indexTbName1} where (clientip match_phrase_prefix '2'); """
qt_sql """ select count() from ${indexTbName1} where (clientip match_phrase_prefix '2' or request match_phrase_prefix '2'); """
qt_sql """ select count() from ${indexTbName1} where (clientip match_phrase_prefix '2' or request match_phrase_prefix '2' or status match_phrase_prefix '2' or size match_phrase_prefix '2'); """
qt_sql """ select count() from ${indexTbName1} where (clientip match_phrase_prefix 'a' or request match_phrase_prefix 'a' or status match_phrase_prefix 'a' or size match_phrase_prefix 'a'); """

qt_sql """ select count() from ${indexTbName2} where multi_match(clientip, '', 'phrase_prefix', '2'); """
qt_sql """ select count() from ${indexTbName2} where multi_match(clientip, 'request', 'phrase_prefix', '2'); """
qt_sql """ select count() from ${indexTbName2} where multi_match(clientip, 'request, status, size', 'phrase_prefix', '2'); """
qt_sql """ select count() from ${indexTbName2} where multi_match(clientip, 'request, status, size', 'phrase_prefix', 'a'); """

} finally {
//try_sql("DROP TABLE IF EXISTS ${testTable}")
}
}

0 comments on commit 0c8fd93

Please sign in to comment.