Skip to content

Commit

Permalink
[branch-2.1] Picks "[Fix](partial update) Persist partial_update_info…
Browse files Browse the repository at this point in the history
… in RocksDB in case of BE restart after a partial update has commited #38331" (#39035)

picks #38331 and
#39066
  • Loading branch information
bobhan1 authored Aug 8, 2024
1 parent 4668ebd commit 1fbfb81
Show file tree
Hide file tree
Showing 13 changed files with 586 additions and 77 deletions.
155 changes: 155 additions & 0 deletions be/src/olap/partial_update_info.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include "olap/partial_update_info.h"

#include <gen_cpp/olap_file.pb.h>

#include "olap/tablet_schema.h"

namespace doris {

void PartialUpdateInfo::init(const TabletSchema& tablet_schema, bool partial_update,
const std::set<string>& partial_update_cols, bool is_strict_mode,
int64_t timestamp_ms, const std::string& timezone,
const std::string& auto_increment_column, int64_t cur_max_version) {
is_partial_update = partial_update;
partial_update_input_columns = partial_update_cols;
max_version_in_flush_phase = cur_max_version;
this->timestamp_ms = timestamp_ms;
this->timezone = timezone;
missing_cids.clear();
update_cids.clear();
for (auto i = 0; i < tablet_schema.num_columns(); ++i) {
auto tablet_column = tablet_schema.column(i);
if (!partial_update_input_columns.contains(tablet_column.name())) {
missing_cids.emplace_back(i);
if (!tablet_column.has_default_value() && !tablet_column.is_nullable() &&
tablet_schema.auto_increment_column() != tablet_column.name()) {
can_insert_new_rows_in_partial_update = false;
}
} else {
update_cids.emplace_back(i);
}
if (auto_increment_column == tablet_column.name()) {
is_schema_contains_auto_inc_column = true;
}
}
this->is_strict_mode = is_strict_mode;
is_input_columns_contains_auto_inc_column =
is_partial_update && partial_update_input_columns.contains(auto_increment_column);
_generate_default_values_for_missing_cids(tablet_schema);
}

void PartialUpdateInfo::to_pb(PartialUpdateInfoPB* partial_update_info_pb) const {
partial_update_info_pb->set_is_partial_update(is_partial_update);
partial_update_info_pb->set_max_version_in_flush_phase(max_version_in_flush_phase);
for (const auto& col : partial_update_input_columns) {
partial_update_info_pb->add_partial_update_input_columns(col);
}
for (auto cid : missing_cids) {
partial_update_info_pb->add_missing_cids(cid);
}
for (auto cid : update_cids) {
partial_update_info_pb->add_update_cids(cid);
}
partial_update_info_pb->set_can_insert_new_rows_in_partial_update(
can_insert_new_rows_in_partial_update);
partial_update_info_pb->set_is_strict_mode(is_strict_mode);
partial_update_info_pb->set_timestamp_ms(timestamp_ms);
partial_update_info_pb->set_timezone(timezone);
partial_update_info_pb->set_is_input_columns_contains_auto_inc_column(
is_input_columns_contains_auto_inc_column);
partial_update_info_pb->set_is_schema_contains_auto_inc_column(
is_schema_contains_auto_inc_column);
for (const auto& value : default_values) {
partial_update_info_pb->add_default_values(value);
}
}

void PartialUpdateInfo::from_pb(PartialUpdateInfoPB* partial_update_info_pb) {
is_partial_update = partial_update_info_pb->is_partial_update();
max_version_in_flush_phase = partial_update_info_pb->has_max_version_in_flush_phase()
? partial_update_info_pb->max_version_in_flush_phase()
: -1;
partial_update_input_columns.clear();
for (const auto& col : partial_update_info_pb->partial_update_input_columns()) {
partial_update_input_columns.insert(col);
}
missing_cids.clear();
for (auto cid : partial_update_info_pb->missing_cids()) {
missing_cids.push_back(cid);
}
update_cids.clear();
for (auto cid : partial_update_info_pb->update_cids()) {
update_cids.push_back(cid);
}
can_insert_new_rows_in_partial_update =
partial_update_info_pb->can_insert_new_rows_in_partial_update();
is_strict_mode = partial_update_info_pb->is_strict_mode();
timestamp_ms = partial_update_info_pb->timestamp_ms();
timezone = partial_update_info_pb->timezone();
is_input_columns_contains_auto_inc_column =
partial_update_info_pb->is_input_columns_contains_auto_inc_column();
is_schema_contains_auto_inc_column =
partial_update_info_pb->is_schema_contains_auto_inc_column();
default_values.clear();
for (const auto& value : partial_update_info_pb->default_values()) {
default_values.push_back(value);
}
}

std::string PartialUpdateInfo::summary() const {
return fmt::format(
"update_cids={}, missing_cids={}, is_strict_mode={}, max_version_in_flush_phase={}",
update_cids.size(), missing_cids.size(), is_strict_mode, max_version_in_flush_phase);
}

void PartialUpdateInfo::_generate_default_values_for_missing_cids(
const TabletSchema& tablet_schema) {
for (unsigned int cur_cid : missing_cids) {
const auto& column = tablet_schema.column(cur_cid);
if (column.has_default_value()) {
std::string default_value;
if (UNLIKELY(tablet_schema.column(cur_cid).type() ==
FieldType::OLAP_FIELD_TYPE_DATETIMEV2 &&
to_lower(tablet_schema.column(cur_cid).default_value())
.find(to_lower("CURRENT_TIMESTAMP")) !=
std::string::npos)) {
DateV2Value<DateTimeV2ValueType> dtv;
dtv.from_unixtime(timestamp_ms / 1000, timezone);
default_value = dtv.debug_string();
} else if (UNLIKELY(tablet_schema.column(cur_cid).type() ==
FieldType::OLAP_FIELD_TYPE_DATEV2 &&
to_lower(tablet_schema.column(cur_cid).default_value())
.find(to_lower("CURRENT_DATE")) !=
std::string::npos)) {
DateV2Value<DateV2ValueType> dv;
dv.from_unixtime(timestamp_ms / 1000, timezone);
default_value = dv.debug_string();
} else {
default_value = tablet_schema.column(cur_cid).default_value();
}
default_values.emplace_back(default_value);
} else {
// place an empty string here
default_values.emplace_back();
}
}
CHECK_EQ(missing_cids.size(), default_values.size());
}
} // namespace doris
76 changes: 12 additions & 64 deletions be/src/olap/partial_update_info.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,78 +16,26 @@
// under the License.

#pragma once

#include "olap/tablet_schema.h"
#include <cstdint>
#include <set>
#include <string>
#include <vector>

namespace doris {
class TabletSchema;
class PartialUpdateInfoPB;

struct PartialUpdateInfo {
void init(const TabletSchema& tablet_schema, bool partial_update,
const std::set<string>& partial_update_cols, bool is_strict_mode,
const std::set<std::string>& partial_update_cols, bool is_strict_mode,
int64_t timestamp_ms, const std::string& timezone,
const std::string& auto_increment_column, int64_t cur_max_version = -1) {
is_partial_update = partial_update;
partial_update_input_columns = partial_update_cols;
max_version_in_flush_phase = cur_max_version;
this->timestamp_ms = timestamp_ms;
this->timezone = timezone;
missing_cids.clear();
update_cids.clear();
for (auto i = 0; i < tablet_schema.num_columns(); ++i) {
auto tablet_column = tablet_schema.column(i);
if (!partial_update_input_columns.contains(tablet_column.name())) {
missing_cids.emplace_back(i);
if (!tablet_column.has_default_value() && !tablet_column.is_nullable() &&
tablet_schema.auto_increment_column() != tablet_column.name()) {
can_insert_new_rows_in_partial_update = false;
}
} else {
update_cids.emplace_back(i);
}
if (auto_increment_column == tablet_column.name()) {
is_schema_contains_auto_inc_column = true;
}
}
this->is_strict_mode = is_strict_mode;
is_input_columns_contains_auto_inc_column =
is_partial_update && partial_update_input_columns.contains(auto_increment_column);
_generate_default_values_for_missing_cids(tablet_schema);
}
const std::string& auto_increment_column, int64_t cur_max_version = -1);
void to_pb(PartialUpdateInfoPB* partial_update_info) const;
void from_pb(PartialUpdateInfoPB* partial_update_info);
std::string summary() const;

private:
void _generate_default_values_for_missing_cids(const TabletSchema& tablet_schema) {
for (auto i = 0; i < missing_cids.size(); ++i) {
auto cur_cid = missing_cids[i];
const auto& column = tablet_schema.column(cur_cid);
if (column.has_default_value()) {
std::string default_value;
if (UNLIKELY(tablet_schema.column(cur_cid).type() ==
FieldType::OLAP_FIELD_TYPE_DATETIMEV2 &&
to_lower(tablet_schema.column(cur_cid).default_value())
.find(to_lower("CURRENT_TIMESTAMP")) !=
std::string::npos)) {
DateV2Value<DateTimeV2ValueType> dtv;
dtv.from_unixtime(timestamp_ms / 1000, timezone);
default_value = dtv.debug_string();
} else if (UNLIKELY(tablet_schema.column(cur_cid).type() ==
FieldType::OLAP_FIELD_TYPE_DATEV2 &&
to_lower(tablet_schema.column(cur_cid).default_value())
.find(to_lower("CURRENT_DATE")) !=
std::string::npos)) {
DateV2Value<DateV2ValueType> dv;
dv.from_unixtime(timestamp_ms / 1000, timezone);
default_value = dv.debug_string();
} else {
default_value = tablet_schema.column(cur_cid).default_value();
}
default_values.emplace_back(default_value);
} else {
// place an empty string here
default_values.emplace_back();
}
}
CHECK_EQ(missing_cids.size(), default_values.size());
}
void _generate_default_values_for_missing_cids(const TabletSchema& tablet_schema);

public:
bool is_partial_update {false};
Expand Down
94 changes: 94 additions & 0 deletions be/src/olap/rowset/rowset_meta_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -535,4 +535,98 @@ Status RowsetMetaManager::load_json_rowset_meta(OlapMeta* meta,
return status;
}

Status RowsetMetaManager::save_partial_update_info(
OlapMeta* meta, int64_t tablet_id, int64_t partition_id, int64_t txn_id,
const PartialUpdateInfoPB& partial_update_info_pb) {
std::string key =
fmt::format("{}{}_{}_{}", PARTIAL_UPDATE_INFO_PREFIX, tablet_id, partition_id, txn_id);
std::string value;
if (!partial_update_info_pb.SerializeToString(&value)) {
return Status::Error<SERIALIZE_PROTOBUF_ERROR>(
"serialize partial update info failed. key={}", key);
}
VLOG_NOTICE << "save partial update info, key=" << key << ", value_size=" << value.size();
return meta->put(META_COLUMN_FAMILY_INDEX, key, value);
}

Status RowsetMetaManager::try_get_partial_update_info(OlapMeta* meta, int64_t tablet_id,
int64_t partition_id, int64_t txn_id,
PartialUpdateInfoPB* partial_update_info_pb) {
std::string key =
fmt::format("{}{}_{}_{}", PARTIAL_UPDATE_INFO_PREFIX, tablet_id, partition_id, txn_id);
std::string value;
Status status = meta->get(META_COLUMN_FAMILY_INDEX, key, &value);
if (status.is<META_KEY_NOT_FOUND>()) {
return status;
}
if (!status.ok()) {
LOG_WARNING("failed to get partial update info. tablet_id={}, partition_id={}, txn_id={}",
tablet_id, partition_id, txn_id);
return status;
}
if (!partial_update_info_pb->ParseFromString(value)) {
return Status::Error<ErrorCode::PARSE_PROTOBUF_ERROR>(
"fail to parse partial update info content to protobuf object. tablet_id={}, "
"partition_id={}, txn_id={}",
tablet_id, partition_id, txn_id);
}
return Status::OK();
}

Status RowsetMetaManager::traverse_partial_update_info(
OlapMeta* meta,
std::function<bool(int64_t, int64_t, int64_t, std::string_view)> const& func) {
auto traverse_partial_update_info_func = [&func](const std::string& key,
const std::string& value) -> bool {
std::vector<std::string> parts;
// key format: pui_{tablet_id}_{partition_id}_{txn_id}
RETURN_IF_ERROR(split_string(key, '_', &parts));
if (parts.size() != 4) {
LOG_WARNING("invalid rowset key={}, splitted size={}", key, parts.size());
return true;
}
int64_t tablet_id = std::stoll(parts[1]);
int64_t partition_id = std::stoll(parts[2]);
int64_t txn_id = std::stoll(parts[3]);
return func(tablet_id, partition_id, txn_id, value);
};
return meta->iterate(META_COLUMN_FAMILY_INDEX, PARTIAL_UPDATE_INFO_PREFIX,
traverse_partial_update_info_func);
}

Status RowsetMetaManager::remove_partial_update_info(OlapMeta* meta, int64_t tablet_id,
int64_t partition_id, int64_t txn_id) {
std::string key =
fmt::format("{}{}_{}_{}", PARTIAL_UPDATE_INFO_PREFIX, tablet_id, partition_id, txn_id);
Status res = meta->remove(META_COLUMN_FAMILY_INDEX, key);
VLOG_NOTICE << "remove partial update info, key=" << key;
return res;
}

Status RowsetMetaManager::remove_partial_update_infos(
OlapMeta* meta, const std::vector<std::tuple<int64_t, int64_t, int64_t>>& keys) {
std::vector<std::string> remove_keys;
for (auto [tablet_id, partition_id, txn_id] : keys) {
remove_keys.push_back(fmt::format("{}{}_{}_{}", PARTIAL_UPDATE_INFO_PREFIX, tablet_id,
partition_id, txn_id));
}
Status res = meta->remove(META_COLUMN_FAMILY_INDEX, remove_keys);
VLOG_NOTICE << "remove partial update info, remove_keys.size()=" << remove_keys.size();
return res;
}

Status RowsetMetaManager::remove_tablet_related_partial_update_info(OlapMeta* meta,
int64_t tablet_id) {
std::string prefix = fmt::format("{}{}", PARTIAL_UPDATE_INFO_PREFIX, tablet_id);
std::vector<std::string> remove_keys;
auto get_remove_keys_func = [&](const std::string& key, const std::string& value) -> bool {
remove_keys.emplace_back(key);
return true;
};
VLOG_NOTICE << "remove tablet related partial update info, tablet_id: " << tablet_id
<< " removed keys size: " << remove_keys.size();
RETURN_IF_ERROR(meta->iterate(META_COLUMN_FAMILY_INDEX, prefix, get_remove_keys_func));
return meta->remove(META_COLUMN_FAMILY_INDEX, remove_keys);
}

} // namespace doris
21 changes: 21 additions & 0 deletions be/src/olap/rowset/rowset_meta_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
#ifndef DORIS_BE_SRC_OLAP_ROWSET_ROWSET_META_MANAGER_H
#define DORIS_BE_SRC_OLAP_ROWSET_ROWSET_META_MANAGER_H

#include <gen_cpp/olap_file.pb.h>

#include <cstdint>
#include <functional>
#include <string>
Expand All @@ -32,11 +34,15 @@
namespace doris {
class OlapMeta;
class RowsetMetaPB;
class PartialUpdateInfoPB;
} // namespace doris

namespace doris {
namespace {
const std::string ROWSET_PREFIX = "rst_";

const std::string PARTIAL_UPDATE_INFO_PREFIX = "pui_";

} // namespace

// Helper class for managing rowset meta of one root path.
Expand Down Expand Up @@ -80,6 +86,21 @@ class RowsetMetaManager {

static Status load_json_rowset_meta(OlapMeta* meta, const std::string& rowset_meta_path);

static Status save_partial_update_info(OlapMeta* meta, int64_t tablet_id, int64_t partition_id,
int64_t txn_id,
const PartialUpdateInfoPB& partial_update_info_pb);
static Status try_get_partial_update_info(OlapMeta* meta, int64_t tablet_id,
int64_t partition_id, int64_t txn_id,
PartialUpdateInfoPB* partial_update_info_pb);
static Status traverse_partial_update_info(
OlapMeta* meta,
std::function<bool(int64_t, int64_t, int64_t, std::string_view)> const& func);
static Status remove_partial_update_info(OlapMeta* meta, int64_t tablet_id,
int64_t partition_id, int64_t txn_id);
static Status remove_partial_update_infos(
OlapMeta* meta, const std::vector<std::tuple<int64_t, int64_t, int64_t>>& keys);
static Status remove_tablet_related_partial_update_info(OlapMeta* meta, int64_t tablet_id);

private:
static Status _save(OlapMeta* meta, TabletUid tablet_uid, const RowsetId& rowset_id,
const RowsetMetaPB& rowset_meta_pb);
Expand Down
Loading

0 comments on commit 1fbfb81

Please sign in to comment.