diff --git a/be/src/olap/partial_update_info.cpp b/be/src/olap/partial_update_info.cpp new file mode 100644 index 00000000000000..5867a77559b36d --- /dev/null +++ b/be/src/olap/partial_update_info.cpp @@ -0,0 +1,155 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "olap/partial_update_info.h" + +#include + +#include "olap/tablet_schema.h" + +namespace doris { + +void PartialUpdateInfo::init(const TabletSchema& tablet_schema, bool partial_update, + const std::set& partial_update_cols, bool is_strict_mode, + int64_t timestamp_ms, const std::string& timezone, + const std::string& auto_increment_column, int64_t cur_max_version) { + is_partial_update = partial_update; + partial_update_input_columns = partial_update_cols; + max_version_in_flush_phase = cur_max_version; + this->timestamp_ms = timestamp_ms; + this->timezone = timezone; + missing_cids.clear(); + update_cids.clear(); + for (auto i = 0; i < tablet_schema.num_columns(); ++i) { + auto tablet_column = tablet_schema.column(i); + if (!partial_update_input_columns.contains(tablet_column.name())) { + missing_cids.emplace_back(i); + if (!tablet_column.has_default_value() && !tablet_column.is_nullable() && + tablet_schema.auto_increment_column() != tablet_column.name()) { + can_insert_new_rows_in_partial_update = false; + } + } else { + update_cids.emplace_back(i); + } + if (auto_increment_column == tablet_column.name()) { + is_schema_contains_auto_inc_column = true; + } + } + this->is_strict_mode = is_strict_mode; + is_input_columns_contains_auto_inc_column = + is_partial_update && partial_update_input_columns.contains(auto_increment_column); + _generate_default_values_for_missing_cids(tablet_schema); +} + +void PartialUpdateInfo::to_pb(PartialUpdateInfoPB* partial_update_info_pb) const { + partial_update_info_pb->set_is_partial_update(is_partial_update); + partial_update_info_pb->set_max_version_in_flush_phase(max_version_in_flush_phase); + for (const auto& col : partial_update_input_columns) { + partial_update_info_pb->add_partial_update_input_columns(col); + } + for (auto cid : missing_cids) { + partial_update_info_pb->add_missing_cids(cid); + } + for (auto cid : update_cids) { + partial_update_info_pb->add_update_cids(cid); + } + partial_update_info_pb->set_can_insert_new_rows_in_partial_update( + can_insert_new_rows_in_partial_update); + partial_update_info_pb->set_is_strict_mode(is_strict_mode); + partial_update_info_pb->set_timestamp_ms(timestamp_ms); + partial_update_info_pb->set_timezone(timezone); + partial_update_info_pb->set_is_input_columns_contains_auto_inc_column( + is_input_columns_contains_auto_inc_column); + partial_update_info_pb->set_is_schema_contains_auto_inc_column( + is_schema_contains_auto_inc_column); + for (const auto& value : default_values) { + partial_update_info_pb->add_default_values(value); + } +} + +void PartialUpdateInfo::from_pb(PartialUpdateInfoPB* partial_update_info_pb) { + is_partial_update = partial_update_info_pb->is_partial_update(); + max_version_in_flush_phase = partial_update_info_pb->has_max_version_in_flush_phase() + ? partial_update_info_pb->max_version_in_flush_phase() + : -1; + partial_update_input_columns.clear(); + for (const auto& col : partial_update_info_pb->partial_update_input_columns()) { + partial_update_input_columns.insert(col); + } + missing_cids.clear(); + for (auto cid : partial_update_info_pb->missing_cids()) { + missing_cids.push_back(cid); + } + update_cids.clear(); + for (auto cid : partial_update_info_pb->update_cids()) { + update_cids.push_back(cid); + } + can_insert_new_rows_in_partial_update = + partial_update_info_pb->can_insert_new_rows_in_partial_update(); + is_strict_mode = partial_update_info_pb->is_strict_mode(); + timestamp_ms = partial_update_info_pb->timestamp_ms(); + timezone = partial_update_info_pb->timezone(); + is_input_columns_contains_auto_inc_column = + partial_update_info_pb->is_input_columns_contains_auto_inc_column(); + is_schema_contains_auto_inc_column = + partial_update_info_pb->is_schema_contains_auto_inc_column(); + default_values.clear(); + for (const auto& value : partial_update_info_pb->default_values()) { + default_values.push_back(value); + } +} + +std::string PartialUpdateInfo::summary() const { + return fmt::format( + "update_cids={}, missing_cids={}, is_strict_mode={}, max_version_in_flush_phase={}", + update_cids.size(), missing_cids.size(), is_strict_mode, max_version_in_flush_phase); +} + +void PartialUpdateInfo::_generate_default_values_for_missing_cids( + const TabletSchema& tablet_schema) { + for (unsigned int cur_cid : missing_cids) { + const auto& column = tablet_schema.column(cur_cid); + if (column.has_default_value()) { + std::string default_value; + if (UNLIKELY(tablet_schema.column(cur_cid).type() == + FieldType::OLAP_FIELD_TYPE_DATETIMEV2 && + to_lower(tablet_schema.column(cur_cid).default_value()) + .find(to_lower("CURRENT_TIMESTAMP")) != + std::string::npos)) { + DateV2Value dtv; + dtv.from_unixtime(timestamp_ms / 1000, timezone); + default_value = dtv.debug_string(); + } else if (UNLIKELY(tablet_schema.column(cur_cid).type() == + FieldType::OLAP_FIELD_TYPE_DATEV2 && + to_lower(tablet_schema.column(cur_cid).default_value()) + .find(to_lower("CURRENT_DATE")) != + std::string::npos)) { + DateV2Value dv; + dv.from_unixtime(timestamp_ms / 1000, timezone); + default_value = dv.debug_string(); + } else { + default_value = tablet_schema.column(cur_cid).default_value(); + } + default_values.emplace_back(default_value); + } else { + // place an empty string here + default_values.emplace_back(); + } + } + CHECK_EQ(missing_cids.size(), default_values.size()); +} +} // namespace doris diff --git a/be/src/olap/partial_update_info.h b/be/src/olap/partial_update_info.h index 4b62cb8f0ffb31..987f31ec7f7eb9 100644 --- a/be/src/olap/partial_update_info.h +++ b/be/src/olap/partial_update_info.h @@ -16,78 +16,26 @@ // under the License. #pragma once - -#include "olap/tablet_schema.h" +#include +#include +#include +#include namespace doris { +class TabletSchema; +class PartialUpdateInfoPB; struct PartialUpdateInfo { void init(const TabletSchema& tablet_schema, bool partial_update, - const std::set& partial_update_cols, bool is_strict_mode, + const std::set& partial_update_cols, bool is_strict_mode, int64_t timestamp_ms, const std::string& timezone, - const std::string& auto_increment_column, int64_t cur_max_version = -1) { - is_partial_update = partial_update; - partial_update_input_columns = partial_update_cols; - max_version_in_flush_phase = cur_max_version; - this->timestamp_ms = timestamp_ms; - this->timezone = timezone; - missing_cids.clear(); - update_cids.clear(); - for (auto i = 0; i < tablet_schema.num_columns(); ++i) { - auto tablet_column = tablet_schema.column(i); - if (!partial_update_input_columns.contains(tablet_column.name())) { - missing_cids.emplace_back(i); - if (!tablet_column.has_default_value() && !tablet_column.is_nullable() && - tablet_schema.auto_increment_column() != tablet_column.name()) { - can_insert_new_rows_in_partial_update = false; - } - } else { - update_cids.emplace_back(i); - } - if (auto_increment_column == tablet_column.name()) { - is_schema_contains_auto_inc_column = true; - } - } - this->is_strict_mode = is_strict_mode; - is_input_columns_contains_auto_inc_column = - is_partial_update && partial_update_input_columns.contains(auto_increment_column); - _generate_default_values_for_missing_cids(tablet_schema); - } + const std::string& auto_increment_column, int64_t cur_max_version = -1); + void to_pb(PartialUpdateInfoPB* partial_update_info) const; + void from_pb(PartialUpdateInfoPB* partial_update_info); + std::string summary() const; private: - void _generate_default_values_for_missing_cids(const TabletSchema& tablet_schema) { - for (auto i = 0; i < missing_cids.size(); ++i) { - auto cur_cid = missing_cids[i]; - const auto& column = tablet_schema.column(cur_cid); - if (column.has_default_value()) { - std::string default_value; - if (UNLIKELY(tablet_schema.column(cur_cid).type() == - FieldType::OLAP_FIELD_TYPE_DATETIMEV2 && - to_lower(tablet_schema.column(cur_cid).default_value()) - .find(to_lower("CURRENT_TIMESTAMP")) != - std::string::npos)) { - DateV2Value dtv; - dtv.from_unixtime(timestamp_ms / 1000, timezone); - default_value = dtv.debug_string(); - } else if (UNLIKELY(tablet_schema.column(cur_cid).type() == - FieldType::OLAP_FIELD_TYPE_DATEV2 && - to_lower(tablet_schema.column(cur_cid).default_value()) - .find(to_lower("CURRENT_DATE")) != - std::string::npos)) { - DateV2Value dv; - dv.from_unixtime(timestamp_ms / 1000, timezone); - default_value = dv.debug_string(); - } else { - default_value = tablet_schema.column(cur_cid).default_value(); - } - default_values.emplace_back(default_value); - } else { - // place an empty string here - default_values.emplace_back(); - } - } - CHECK_EQ(missing_cids.size(), default_values.size()); - } + void _generate_default_values_for_missing_cids(const TabletSchema& tablet_schema); public: bool is_partial_update {false}; diff --git a/be/src/olap/rowset/rowset_meta_manager.cpp b/be/src/olap/rowset/rowset_meta_manager.cpp index 38911327d84c65..d89be5ab8ecd93 100644 --- a/be/src/olap/rowset/rowset_meta_manager.cpp +++ b/be/src/olap/rowset/rowset_meta_manager.cpp @@ -535,4 +535,98 @@ Status RowsetMetaManager::load_json_rowset_meta(OlapMeta* meta, return status; } +Status RowsetMetaManager::save_partial_update_info( + OlapMeta* meta, int64_t tablet_id, int64_t partition_id, int64_t txn_id, + const PartialUpdateInfoPB& partial_update_info_pb) { + std::string key = + fmt::format("{}{}_{}_{}", PARTIAL_UPDATE_INFO_PREFIX, tablet_id, partition_id, txn_id); + std::string value; + if (!partial_update_info_pb.SerializeToString(&value)) { + return Status::Error( + "serialize partial update info failed. key={}", key); + } + VLOG_NOTICE << "save partial update info, key=" << key << ", value_size=" << value.size(); + return meta->put(META_COLUMN_FAMILY_INDEX, key, value); +} + +Status RowsetMetaManager::try_get_partial_update_info(OlapMeta* meta, int64_t tablet_id, + int64_t partition_id, int64_t txn_id, + PartialUpdateInfoPB* partial_update_info_pb) { + std::string key = + fmt::format("{}{}_{}_{}", PARTIAL_UPDATE_INFO_PREFIX, tablet_id, partition_id, txn_id); + std::string value; + Status status = meta->get(META_COLUMN_FAMILY_INDEX, key, &value); + if (status.is()) { + return status; + } + if (!status.ok()) { + LOG_WARNING("failed to get partial update info. tablet_id={}, partition_id={}, txn_id={}", + tablet_id, partition_id, txn_id); + return status; + } + if (!partial_update_info_pb->ParseFromString(value)) { + return Status::Error( + "fail to parse partial update info content to protobuf object. tablet_id={}, " + "partition_id={}, txn_id={}", + tablet_id, partition_id, txn_id); + } + return Status::OK(); +} + +Status RowsetMetaManager::traverse_partial_update_info( + OlapMeta* meta, + std::function const& func) { + auto traverse_partial_update_info_func = [&func](const std::string& key, + const std::string& value) -> bool { + std::vector parts; + // key format: pui_{tablet_id}_{partition_id}_{txn_id} + RETURN_IF_ERROR(split_string(key, '_', &parts)); + if (parts.size() != 4) { + LOG_WARNING("invalid rowset key={}, splitted size={}", key, parts.size()); + return true; + } + int64_t tablet_id = std::stoll(parts[1]); + int64_t partition_id = std::stoll(parts[2]); + int64_t txn_id = std::stoll(parts[3]); + return func(tablet_id, partition_id, txn_id, value); + }; + return meta->iterate(META_COLUMN_FAMILY_INDEX, PARTIAL_UPDATE_INFO_PREFIX, + traverse_partial_update_info_func); +} + +Status RowsetMetaManager::remove_partial_update_info(OlapMeta* meta, int64_t tablet_id, + int64_t partition_id, int64_t txn_id) { + std::string key = + fmt::format("{}{}_{}_{}", PARTIAL_UPDATE_INFO_PREFIX, tablet_id, partition_id, txn_id); + Status res = meta->remove(META_COLUMN_FAMILY_INDEX, key); + VLOG_NOTICE << "remove partial update info, key=" << key; + return res; +} + +Status RowsetMetaManager::remove_partial_update_infos( + OlapMeta* meta, const std::vector>& keys) { + std::vector remove_keys; + for (auto [tablet_id, partition_id, txn_id] : keys) { + remove_keys.push_back(fmt::format("{}{}_{}_{}", PARTIAL_UPDATE_INFO_PREFIX, tablet_id, + partition_id, txn_id)); + } + Status res = meta->remove(META_COLUMN_FAMILY_INDEX, remove_keys); + VLOG_NOTICE << "remove partial update info, remove_keys.size()=" << remove_keys.size(); + return res; +} + +Status RowsetMetaManager::remove_tablet_related_partial_update_info(OlapMeta* meta, + int64_t tablet_id) { + std::string prefix = fmt::format("{}{}", PARTIAL_UPDATE_INFO_PREFIX, tablet_id); + std::vector remove_keys; + auto get_remove_keys_func = [&](const std::string& key, const std::string& value) -> bool { + remove_keys.emplace_back(key); + return true; + }; + VLOG_NOTICE << "remove tablet related partial update info, tablet_id: " << tablet_id + << " removed keys size: " << remove_keys.size(); + RETURN_IF_ERROR(meta->iterate(META_COLUMN_FAMILY_INDEX, prefix, get_remove_keys_func)); + return meta->remove(META_COLUMN_FAMILY_INDEX, remove_keys); +} + } // namespace doris diff --git a/be/src/olap/rowset/rowset_meta_manager.h b/be/src/olap/rowset/rowset_meta_manager.h index 9517ce3f51a2d6..0cfbb3383e3935 100644 --- a/be/src/olap/rowset/rowset_meta_manager.h +++ b/be/src/olap/rowset/rowset_meta_manager.h @@ -18,6 +18,8 @@ #ifndef DORIS_BE_SRC_OLAP_ROWSET_ROWSET_META_MANAGER_H #define DORIS_BE_SRC_OLAP_ROWSET_ROWSET_META_MANAGER_H +#include + #include #include #include @@ -32,11 +34,15 @@ namespace doris { class OlapMeta; class RowsetMetaPB; +class PartialUpdateInfoPB; } // namespace doris namespace doris { namespace { const std::string ROWSET_PREFIX = "rst_"; + +const std::string PARTIAL_UPDATE_INFO_PREFIX = "pui_"; + } // namespace // Helper class for managing rowset meta of one root path. @@ -80,6 +86,21 @@ class RowsetMetaManager { static Status load_json_rowset_meta(OlapMeta* meta, const std::string& rowset_meta_path); + static Status save_partial_update_info(OlapMeta* meta, int64_t tablet_id, int64_t partition_id, + int64_t txn_id, + const PartialUpdateInfoPB& partial_update_info_pb); + static Status try_get_partial_update_info(OlapMeta* meta, int64_t tablet_id, + int64_t partition_id, int64_t txn_id, + PartialUpdateInfoPB* partial_update_info_pb); + static Status traverse_partial_update_info( + OlapMeta* meta, + std::function const& func); + static Status remove_partial_update_info(OlapMeta* meta, int64_t tablet_id, + int64_t partition_id, int64_t txn_id); + static Status remove_partial_update_infos( + OlapMeta* meta, const std::vector>& keys); + static Status remove_tablet_related_partial_update_info(OlapMeta* meta, int64_t tablet_id); + private: static Status _save(OlapMeta* meta, TabletUid tablet_uid, const RowsetId& rowset_id, const RowsetMetaPB& rowset_meta_pb); diff --git a/be/src/olap/rowset_builder.cpp b/be/src/olap/rowset_builder.cpp index 32bbdb246a37af..4194d3ae6c3009 100644 --- a/be/src/olap/rowset_builder.cpp +++ b/be/src/olap/rowset_builder.cpp @@ -40,6 +40,7 @@ #include "olap/rowset/beta_rowset_writer.h" #include "olap/rowset/pending_rowset_helper.h" #include "olap/rowset/rowset_meta.h" +#include "olap/rowset/rowset_meta_manager.h" #include "olap/rowset/rowset_writer.h" #include "olap/rowset/rowset_writer_context.h" #include "olap/schema_change.h" @@ -325,10 +326,11 @@ Status RowsetBuilder::commit_txn() { // => update_schema: A(bigint), B(double), C(int), D(int) RETURN_IF_ERROR(tablet()->update_by_least_common_schema(rw_ctx.tablet_schema)); } + // Transfer ownership of `PendingRowsetGuard` to `TxnManager` - Status res = _engine.txn_manager()->commit_txn(_req.partition_id, *tablet(), _req.txn_id, - _req.load_id, _rowset, - std::move(_pending_rs_guard), false); + Status res = _engine.txn_manager()->commit_txn( + _req.partition_id, *tablet(), _req.txn_id, _req.load_id, _rowset, + std::move(_pending_rs_guard), false, _partial_update_info); if (!res && !res.is()) { LOG(WARNING) << "Failed to commit txn: " << _req.txn_id diff --git a/be/src/olap/storage_engine.cpp b/be/src/olap/storage_engine.cpp index 82c07a59152455..f4b11b8fb62145 100644 --- a/be/src/olap/storage_engine.cpp +++ b/be/src/olap/storage_engine.cpp @@ -63,6 +63,7 @@ #include "olap/olap_common.h" #include "olap/olap_define.h" #include "olap/olap_meta.h" +#include "olap/rowset/rowset_fwd.h" #include "olap/rowset/rowset_meta.h" #include "olap/rowset/rowset_meta_manager.h" #include "olap/rowset/unique_rowset_id_generator.h" @@ -807,6 +808,9 @@ Status StorageEngine::start_trash_sweep(double* usage, bool ignore_guard) { // cleand unused pending publish info for deleted tablet _clean_unused_pending_publish_info(); + // clean unused partial update info for finished txns + _clean_unused_partial_update_info(); + // clean unused rowsets in remote storage backends for (auto data_dir : get_stores()) { data_dir->perform_remote_rowset_gc(); @@ -970,6 +974,34 @@ void StorageEngine::_clean_unused_pending_publish_info() { } } +void StorageEngine::_clean_unused_partial_update_info() { + std::vector> remove_infos; + auto unused_partial_update_info_collector = + [this, &remove_infos](int64_t tablet_id, int64_t partition_id, int64_t txn_id, + std::string_view value) -> bool { + TabletSharedPtr tablet = _tablet_manager->get_tablet(tablet_id); + if (tablet == nullptr) { + remove_infos.emplace_back(tablet_id, partition_id, txn_id); + return true; + } + TxnState txn_state = + _txn_manager->get_txn_state(partition_id, txn_id, tablet_id, tablet->tablet_uid()); + if (txn_state == TxnState::NOT_FOUND || txn_state == TxnState::ABORTED || + txn_state == TxnState::DELETED) { + remove_infos.emplace_back(tablet_id, partition_id, txn_id); + return true; + } + return true; + }; + auto data_dirs = get_stores(); + for (auto* data_dir : data_dirs) { + static_cast(RowsetMetaManager::traverse_partial_update_info( + data_dir->get_meta(), unused_partial_update_info_collector)); + static_cast( + RowsetMetaManager::remove_partial_update_infos(data_dir->get_meta(), remove_infos)); + } +} + void StorageEngine::gc_binlogs(const std::unordered_map& gc_tablet_infos) { for (auto [tablet_id, version] : gc_tablet_infos) { LOG(INFO) << fmt::format("start to gc binlogs for tablet_id: {}, version: {}", tablet_id, diff --git a/be/src/olap/storage_engine.h b/be/src/olap/storage_engine.h index f647869e82500c..5562257133c5fb 100644 --- a/be/src/olap/storage_engine.h +++ b/be/src/olap/storage_engine.h @@ -255,6 +255,8 @@ class StorageEngine { void _clean_unused_pending_publish_info(); + void _clean_unused_partial_update_info(); + Status _do_sweep(const std::string& scan_root, const time_t& local_tm_now, const int32_t expire); diff --git a/be/src/olap/txn_manager.cpp b/be/src/olap/txn_manager.cpp index 2ed1ac5674d53f..373c398df61fb1 100644 --- a/be/src/olap/txn_manager.cpp +++ b/be/src/olap/txn_manager.cpp @@ -33,9 +33,11 @@ #include "common/config.h" #include "common/logging.h" +#include "common/status.h" #include "olap/data_dir.h" #include "olap/delta_writer.h" #include "olap/olap_common.h" +#include "olap/partial_update_info.h" #include "olap/rowset/pending_rowset_helper.h" #include "olap/rowset/rowset_meta.h" #include "olap/rowset/rowset_meta_manager.h" @@ -173,10 +175,11 @@ Status TxnManager::prepare_txn(TPartitionId partition_id, TTransactionId transac Status TxnManager::commit_txn(TPartitionId partition_id, const Tablet& tablet, TTransactionId transaction_id, const PUniqueId& load_id, const RowsetSharedPtr& rowset_ptr, PendingRowsetGuard guard, - bool is_recovery) { + bool is_recovery, + std::shared_ptr partial_update_info) { return commit_txn(tablet.data_dir()->get_meta(), partition_id, transaction_id, tablet.tablet_id(), tablet.tablet_uid(), load_id, rowset_ptr, - std::move(guard), is_recovery); + std::move(guard), is_recovery, partial_update_info); } Status TxnManager::publish_txn(TPartitionId partition_id, const TabletSharedPtr& tablet, @@ -259,7 +262,8 @@ Status TxnManager::commit_txn(OlapMeta* meta, TPartitionId partition_id, TTransactionId transaction_id, TTabletId tablet_id, TabletUid tablet_uid, const PUniqueId& load_id, const RowsetSharedPtr& rowset_ptr, PendingRowsetGuard guard, - bool is_recovery) { + bool is_recovery, + std::shared_ptr partial_update_info) { if (partition_id < 1 || transaction_id < 1 || tablet_id < 1) { LOG(WARNING) << "invalid commit req " << " partition_id=" << partition_id << " transaction_id=" << transaction_id @@ -369,6 +373,36 @@ Status TxnManager::commit_txn(OlapMeta* meta, TPartitionId partition_id, save_status.append(fmt::format(", txn id: {}", transaction_id)); return save_status; } + + if (partial_update_info && partial_update_info->is_partial_update) { + PartialUpdateInfoPB partial_update_info_pb; + partial_update_info->to_pb(&partial_update_info_pb); + save_status = RowsetMetaManager::save_partial_update_info( + meta, tablet_id, partition_id, transaction_id, partial_update_info_pb); + if (!save_status.ok()) { + save_status.append(fmt::format(", txn_id: {}", transaction_id)); + return save_status; + } + } + } + + TabletSharedPtr tablet; + std::shared_ptr decoded_partial_update_info {nullptr}; + if (is_recovery) { + tablet = _engine.tablet_manager()->get_tablet(tablet_id, tablet_uid); + if (tablet != nullptr && tablet->enable_unique_key_merge_on_write()) { + PartialUpdateInfoPB partial_update_info_pb; + auto st = RowsetMetaManager::try_get_partial_update_info( + meta, tablet_id, partition_id, transaction_id, &partial_update_info_pb); + if (st.ok()) { + decoded_partial_update_info = std::make_shared(); + decoded_partial_update_info->from_pb(&partial_update_info_pb); + DCHECK(decoded_partial_update_info->is_partial_update); + } else if (!st.is()) { + // the load is not a partial update + return st; + } + } } { @@ -376,11 +410,17 @@ Status TxnManager::commit_txn(OlapMeta* meta, TPartitionId partition_id, auto load_info = std::make_shared(load_id, rowset_ptr); load_info->pending_rs_guard = std::move(guard); if (is_recovery) { - TabletSharedPtr tablet = _engine.tablet_manager()->get_tablet(tablet_info.tablet_id, - tablet_info.tablet_uid); if (tablet != nullptr && tablet->enable_unique_key_merge_on_write()) { load_info->unique_key_merge_on_write = true; load_info->delete_bitmap.reset(new DeleteBitmap(tablet->tablet_id())); + if (decoded_partial_update_info) { + LOG_INFO( + "get partial update info from RocksDB during recovery. txn_id={}, " + "partition_id={}, tablet_id={}, partial_update_info=[{}]", + transaction_id, partition_id, tablet_id, + decoded_partial_update_info->summary()); + load_info->partial_update_info = decoded_partial_update_info; + } } } load_info->commit(); @@ -513,6 +553,20 @@ Status TxnManager::publish_txn(OlapMeta* meta, TPartitionId partition_id, return status; } + if (tablet_txn_info->unique_key_merge_on_write && tablet_txn_info->partial_update_info && + tablet_txn_info->partial_update_info->is_partial_update) { + status = RowsetMetaManager::remove_partial_update_info(meta, tablet_id, partition_id, + transaction_id); + if (!status) { + // discard the error status and print the warning log + LOG_WARNING( + "fail to remove partial update info from RocksDB. txn_id={}, rowset_id={}, " + "tablet_id={}, tablet_uid={}", + transaction_id, rowset->rowset_id().to_string(), tablet_id, + tablet_uid.to_string()); + } + } + // TODO(Drogon): remove these test codes if (enable_binlog) { auto version_str = fmt::format("{}", version.first); @@ -692,6 +746,13 @@ void TxnManager::force_rollback_tablet_related_txns(OlapMeta* meta, TTabletId ta } } } + if (meta != nullptr) { + Status st = RowsetMetaManager::remove_tablet_related_partial_update_info(meta, tablet_id); + if (!st.ok()) { + LOG_WARNING("failed to partial update info, tablet_id={}, err={}", tablet_id, + st.to_string()); + } + } } void TxnManager::get_txn_related_tablets(const TTransactionId transaction_id, diff --git a/be/src/olap/txn_manager.h b/be/src/olap/txn_manager.h index 431ce6e49cf43d..ab34113c7e76c9 100644 --- a/be/src/olap/txn_manager.h +++ b/be/src/olap/txn_manager.h @@ -36,7 +36,6 @@ #include "common/status.h" #include "olap/olap_common.h" -#include "olap/partial_update_info.h" #include "olap/rowset/pending_rowset_helper.h" #include "olap/rowset/rowset.h" #include "olap/rowset/rowset_meta.h" @@ -52,6 +51,7 @@ namespace doris { class DeltaWriter; class OlapMeta; struct TabletPublishStatistics; +struct PartialUpdateInfo; enum class TxnState { NOT_FOUND = 0, @@ -143,8 +143,8 @@ class TxnManager { Status commit_txn(TPartitionId partition_id, const Tablet& tablet, TTransactionId transaction_id, const PUniqueId& load_id, - const RowsetSharedPtr& rowset_ptr, PendingRowsetGuard guard, - bool is_recovery); + const RowsetSharedPtr& rowset_ptr, PendingRowsetGuard guard, bool is_recovery, + std::shared_ptr partial_update_info = nullptr); Status publish_txn(TPartitionId partition_id, const TabletSharedPtr& tablet, TTransactionId transaction_id, const Version& version, @@ -159,8 +159,8 @@ class TxnManager { Status commit_txn(OlapMeta* meta, TPartitionId partition_id, TTransactionId transaction_id, TTabletId tablet_id, TabletUid tablet_uid, const PUniqueId& load_id, - const RowsetSharedPtr& rowset_ptr, PendingRowsetGuard guard, - bool is_recovery); + const RowsetSharedPtr& rowset_ptr, PendingRowsetGuard guard, bool is_recovery, + std::shared_ptr partial_update_info = nullptr); // remove a txn from txn manager // not persist rowset meta because diff --git a/gensrc/proto/olap_file.proto b/gensrc/proto/olap_file.proto index 4d0d37071323b0..55aee8f07d2714 100644 --- a/gensrc/proto/olap_file.proto +++ b/gensrc/proto/olap_file.proto @@ -391,3 +391,18 @@ message RowsetBinlogMetasPB { repeated RowsetBinlogMetaPB rowset_binlog_metas = 1; } + +message PartialUpdateInfoPB { + optional bool is_partial_update = 1 [default = false]; + repeated string partial_update_input_columns = 2; + repeated uint32 missing_cids = 3; + repeated uint32 update_cids = 4; + optional bool can_insert_new_rows_in_partial_update = 5 [default = false]; + optional bool is_strict_mode = 6 [default = false]; + optional int64 timestamp_ms = 7 [default = 0]; + optional string timezone = 8; + optional bool is_input_columns_contains_auto_inc_column = 9 [default = false]; + optional bool is_schema_contains_auto_inc_column = 10 [default = false]; + repeated string default_values = 11; + optional int64 max_version_in_flush_phase = 12 [default = -1]; +} diff --git a/regression-test/data/unique_with_mow_p0/partial_update/data1.csv b/regression-test/data/unique_with_mow_p0/partial_update/data1.csv new file mode 100644 index 00000000000000..be9f2feb69a788 --- /dev/null +++ b/regression-test/data/unique_with_mow_p0/partial_update/data1.csv @@ -0,0 +1,2 @@ +1,10,10 +3,30,30 \ No newline at end of file diff --git a/regression-test/data/unique_with_mow_p0/partial_update/test_partial_update_conflict_be_restart.out b/regression-test/data/unique_with_mow_p0/partial_update/test_partial_update_conflict_be_restart.out new file mode 100644 index 00000000000000..6444b41c2c233b --- /dev/null +++ b/regression-test/data/unique_with_mow_p0/partial_update/test_partial_update_conflict_be_restart.out @@ -0,0 +1,21 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !sql -- +1 1 1 1 1 +2 2 2 2 2 +3 3 3 3 3 + +-- !sql -- +1 1 1 1 1 +2 2 2 2 2 +3 3 3 3 3 + +-- !sql -- +1 1 1 99 99 +2 2 2 88 88 +3 3 3 77 77 + +-- !sql -- +1 10 10 99 99 +2 2 2 88 88 +3 30 30 77 77 + diff --git a/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_conflict_be_restart.groovy b/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_conflict_be_restart.groovy new file mode 100644 index 00000000000000..bc2a44425b30c8 --- /dev/null +++ b/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_conflict_be_restart.groovy @@ -0,0 +1,156 @@ + +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import java.util.Date +import java.text.SimpleDateFormat +import org.apache.http.HttpResponse +import org.apache.http.client.methods.HttpPut +import org.apache.http.impl.client.CloseableHttpClient +import org.apache.http.impl.client.HttpClients +import org.apache.http.entity.ContentType +import org.apache.http.entity.StringEntity +import org.apache.http.client.config.RequestConfig +import org.apache.http.client.RedirectStrategy +import org.apache.http.protocol.HttpContext +import org.apache.http.HttpRequest +import org.apache.http.impl.client.LaxRedirectStrategy +import org.apache.http.client.methods.RequestBuilder +import org.apache.http.entity.StringEntity +import org.apache.http.client.methods.CloseableHttpResponse +import org.apache.http.util.EntityUtils +import org.apache.doris.regression.suite.ClusterOptions + +suite("test_partial_update_conflict_be_restart") { + def dbName = context.config.getDbNameByFile(context.file) + + def options = new ClusterOptions() + options.setFeNum(1) + options.setBeNum(1) + options.cloudMode = false + docker(options) { + def table1 = "test_partial_update_conflict_be_restart" + sql "DROP TABLE IF EXISTS ${table1};" + sql """ CREATE TABLE IF NOT EXISTS ${table1} ( + `k1` int NOT NULL, + `c1` int, + `c2` int, + `c3` int, + `c4` int + )UNIQUE KEY(k1) + DISTRIBUTED BY HASH(k1) BUCKETS 1 + PROPERTIES ( + "disable_auto_compaction" = "true", + "replication_num" = "1"); """ + + sql "insert into ${table1} values(1,1,1,1,1),(2,2,2,2,2),(3,3,3,3,3);" + order_qt_sql "select * from ${table1};" + + def do_streamload_2pc_commit = { txnId -> + def feNode = sql_return_maparray("show frontends;").get(0) + def command = "curl -X PUT --location-trusted -u root:" + + " -H txn_id:${txnId}" + + " -H txn_operation:commit" + + " http://${feNode.Host}:${feNode.HttpPort}/api/${dbName}/${table1}/_stream_load_2pc" + log.info("http_stream execute 2pc: ${command}") + + def process = command.execute() + code = process.waitFor() + out = process.text + json2pc = parseJson(out) + log.info("http_stream 2pc result: ${out}".toString()) + assertEquals(code, 0) + assertEquals("success", json2pc.status.toLowerCase()) + } + + def wait_for_publish = {txnId, waitSecond -> + String st = "PREPARE" + while (!st.equalsIgnoreCase("VISIBLE") && !st.equalsIgnoreCase("ABORTED") && waitSecond > 0) { + Thread.sleep(1000) + waitSecond -= 1 + def result = sql_return_maparray "show transaction from ${dbName} where id = ${txnId}" + assertNotNull(result) + st = result[0].TransactionStatus + } + log.info("Stream load with txn ${txnId} is ${st}") + assertEquals(st, "VISIBLE") + } + + String txnId1 + streamLoad { + table "${table1}" + set 'column_separator', ',' + set 'format', 'csv' + set 'partial_columns', 'true' + set 'columns', 'k1,c1,c2' + set 'strict_mode', "false" + set 'two_phase_commit', 'true' + file 'data1.csv' + time 10000 // limit inflight 10s + check { result, exception, startTime, endTime -> + if (exception != null) { + throw exception + } + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + txnId1 = json.TxnId + assertEquals("success", json.Status.toLowerCase()) + } + } + sql "sync;" + order_qt_sql "select * from ${table1};" + + // another partial update that conflicts with the previous load and publishes successfully + sql "set enable_unique_key_partial_update=true;" + sql "sync;" + sql "insert into ${table1}(k1,c3,c4) values(1, 99, 99),(2,88,88),(3,77,77);" + sql "set enable_unique_key_partial_update=false;" + sql "sync;" + order_qt_sql "select * from ${table1};" + + // restart backend + cluster.restartBackends() + Thread.sleep(5000) + + // wait for be restart + boolean ok = false + int cnt = 0 + for (; cnt < 10; cnt++) { + def be = sql_return_maparray("show backends").get(0) + if (be.Alive.toBoolean()) { + ok = true + break; + } + logger.info("wait for BE restart...") + Thread.sleep(1000) + } + if (!ok) { + logger.info("BE failed to restart") + assertTrue(false) + } + + Thread.sleep(5000) + + do_streamload_2pc_commit(txnId1) + wait_for_publish(txnId1, 10) + + + sql "sync;" + order_qt_sql "select * from ${table1};" + sql "DROP TABLE IF EXISTS ${table1};" + } +}