diff --git a/be/src/olap/snapshot_manager.cpp b/be/src/olap/snapshot_manager.cpp index 56274b9a939a6c..7a2119d6273e74 100644 --- a/be/src/olap/snapshot_manager.cpp +++ b/be/src/olap/snapshot_manager.cpp @@ -68,6 +68,35 @@ using namespace ErrorCode; SnapshotManager* SnapshotManager::_s_instance = nullptr; std::mutex SnapshotManager::_mlock; +LocalSnapshotLockGuard LocalSnapshotLock::acquire(const std::string& path) { + std::unique_lock l(_lock); + auto& ctx = _local_snapshot_contexts[path]; + while (ctx._is_locked) { + ctx._waiting_count++; + ctx._cv.wait(l); + ctx._waiting_count--; + } + + ctx._is_locked = true; + return {path}; +} + +void LocalSnapshotLock::release(const std::string& path) { + std::lock_guard l(_lock); + auto iter = _local_snapshot_contexts.find(path); + if (iter == _local_snapshot_contexts.end()) { + return; + } + + auto& ctx = iter->second; + ctx._is_locked = false; + if (ctx._waiting_count > 0) { + ctx._cv.notify_one(); + } else { + _local_snapshot_contexts.erase(iter); + } +} + SnapshotManager* SnapshotManager::instance() { if (_s_instance == nullptr) { std::lock_guard lock(_mlock); @@ -124,6 +153,8 @@ Status SnapshotManager::make_snapshot(const TSnapshotRequest& request, string* s } Status SnapshotManager::release_snapshot(const string& snapshot_path) { + auto local_snapshot_guard = LocalSnapshotLock::instance().acquire(snapshot_path); + // If the requested snapshot_path is located in the root/snapshot folder, it is considered legal and can be deleted. // Otherwise, it is considered an illegal request and returns an error result. SCOPED_ATTACH_TASK(_mem_tracker); @@ -470,7 +501,7 @@ Status SnapshotManager::_create_snapshot_files(const TabletSharedPtr& ref_tablet } } // be would definitely set it as true no matter has missed version or not - // but it would take no effets on the following range loop + // but it would take no effects on the following range loop if (!is_single_rowset_clone && request.__isset.missing_version) { for (int64_t missed_version : request.missing_version) { Version version = {missed_version, missed_version}; diff --git a/be/src/olap/snapshot_manager.h b/be/src/olap/snapshot_manager.h index e32409dd3cd6c1..6eaec3b1fdf1aa 100644 --- a/be/src/olap/snapshot_manager.h +++ b/be/src/olap/snapshot_manager.h @@ -19,6 +19,7 @@ #include +#include #include #include #include @@ -36,6 +37,55 @@ class RowsetMetaPB; class TSnapshotRequest; struct RowsetId; +class LocalSnapshotLockGuard; + +// A simple lock to protect the local snapshot path. +class LocalSnapshotLock { + friend class LocalSnapshotLockGuard; + +public: + LocalSnapshotLock() = default; + ~LocalSnapshotLock() = default; + LocalSnapshotLock(const LocalSnapshotLock&) = delete; + LocalSnapshotLock& operator=(const LocalSnapshotLock&) = delete; + + static LocalSnapshotLock& instance() { + static LocalSnapshotLock instance; + return instance; + } + + // Acquire the lock for the specified path. It will block if the lock is already held by another. + LocalSnapshotLockGuard acquire(const std::string& path); + +private: + void release(const std::string& path); + + class LocalSnapshotContext { + public: + bool _is_locked = false; + size_t _waiting_count = 0; + std::condition_variable _cv; + + LocalSnapshotContext() = default; + LocalSnapshotContext(const LocalSnapshotContext&) = delete; + LocalSnapshotContext& operator=(const LocalSnapshotContext&) = delete; + }; + + std::mutex _lock; + std::unordered_map _local_snapshot_contexts; +}; + +class LocalSnapshotLockGuard { +public: + LocalSnapshotLockGuard(std::string path) : _snapshot_path(std::move(path)) {} + LocalSnapshotLockGuard(const LocalSnapshotLockGuard&) = delete; + LocalSnapshotLockGuard& operator=(const LocalSnapshotLockGuard&) = delete; + ~LocalSnapshotLockGuard() { LocalSnapshotLock::instance().release(_snapshot_path); } + +private: + std::string _snapshot_path; +}; + class SnapshotManager { public: ~SnapshotManager() {} diff --git a/be/src/runtime/snapshot_loader.cpp b/be/src/runtime/snapshot_loader.cpp index 1764e3d4322e14..2a6ba9274e288b 100644 --- a/be/src/runtime/snapshot_loader.cpp +++ b/be/src/runtime/snapshot_loader.cpp @@ -31,6 +31,7 @@ #include #include +#include #include #include #include @@ -150,6 +151,9 @@ Status SnapshotLoader::upload(const std::map& src_to_d const std::string& src_path = iter->first; const std::string& dest_path = iter->second; + // Take a lock to protect the local snapshot path. + auto local_snapshot_guard = LocalSnapshotLock::instance().acquire(src_path); + int64_t tablet_id = 0; int32_t schema_hash = 0; RETURN_IF_ERROR( @@ -247,6 +251,9 @@ Status SnapshotLoader::download(const std::map& src_to const std::string& remote_path = iter->first; const std::string& local_path = iter->second; + // Take a lock to protect the local snapshot path. + auto local_snapshot_guard = LocalSnapshotLock::instance().acquire(local_path); + int64_t local_tablet_id = 0; int32_t schema_hash = 0; RETURN_IF_ERROR(_get_tablet_id_and_schema_hash_from_file_path(local_path, &local_tablet_id, @@ -403,8 +410,6 @@ Status SnapshotLoader::download(const std::map& src_to Status SnapshotLoader::remote_http_download( const std::vector& remote_tablet_snapshots, std::vector* downloaded_tablet_ids) { - LOG(INFO) << fmt::format("begin to download snapshots via http. job: {}, task id: {}", _job_id, - _task_id); constexpr uint32_t kListRemoteFileTimeout = 15; constexpr uint32_t kDownloadFileMaxRetry = 3; constexpr uint32_t kGetLengthTimeout = 10; @@ -414,35 +419,39 @@ Status SnapshotLoader::remote_http_download( RETURN_IF_ERROR(_report_every(0, &tmp_counter, 0, 0, TTaskType::type::DOWNLOAD)); Status status = Status::OK(); - // Step before, validate all remote - - // Step 1: Validate local tablet snapshot paths + int report_counter = 0; + int finished_num = 0; + int total_num = remote_tablet_snapshots.size(); for (const auto& remote_tablet_snapshot : remote_tablet_snapshots) { - const auto& path = remote_tablet_snapshot.local_snapshot_path; + const auto& local_path = remote_tablet_snapshot.local_snapshot_path; + const auto& remote_path = remote_tablet_snapshot.remote_snapshot_path; + LOG(INFO) << fmt::format( + "download snapshots via http. job: {}, task id: {}, local dir: {}, remote dir: {}", + _job_id, _task_id, local_path, remote_path); + + // Take a lock to protect the local snapshot path. + auto local_snapshot_guard = LocalSnapshotLock::instance().acquire(local_path); + + // Step 1: Validate local tablet snapshot paths bool res = true; - RETURN_IF_ERROR(io::global_local_filesystem()->is_directory(path, &res)); + RETURN_IF_ERROR(io::global_local_filesystem()->is_directory(local_path, &res)); if (!res) { std::stringstream ss; auto err_msg = - fmt::format("snapshot path is not directory or does not exist: {}", path); + fmt::format("snapshot path is not directory or does not exist: {}", local_path); LOG(WARNING) << err_msg; return Status::RuntimeError(err_msg); } - } - // Step 2: get all local files - struct LocalFileStat { - uint64_t size; - std::string md5; - }; - std::unordered_map> local_files_map; - for (const auto& remote_tablet_snapshot : remote_tablet_snapshots) { - const auto& local_path = remote_tablet_snapshot.local_snapshot_path; - std::vector local_files; - RETURN_IF_ERROR(_get_existing_files_from_local(local_path, &local_files)); - - auto& local_filestat = local_files_map[local_path]; - for (auto& local_file : local_files) { + // Step 2: get all local files + struct LocalFileStat { + uint64_t size; + std::string md5; + }; + std::unordered_map local_files; + std::vector existing_files; + RETURN_IF_ERROR(_get_existing_files_from_local(local_path, &existing_files)); + for (auto& local_file : existing_files) { // add file size std::string local_file_path = local_path + "/" + local_file; std::error_code ec; @@ -459,27 +468,20 @@ Status SnapshotLoader::remote_http_download( << " md5sum: " << status.to_string(); return status; } - local_filestat[local_file] = {local_file_size, md5}; + local_files[local_file] = {local_file_size, md5}; } - } - - // Step 3: Validate remote tablet snapshot paths && remote files map - // key is remote snapshot paths, value is filelist - // get all these use http download action - // http://172.16.0.14:6781/api/_tablet/_download?token=e804dd27-86da-4072-af58-70724075d2a4&file=/home/ubuntu/doris_master/output/be/storage/snapshot/20230410102306.9.180//2774718/217609978/2774718.hdr - int report_counter = 0; - int total_num = remote_tablet_snapshots.size(); - int finished_num = 0; - struct RemoteFileStat { - std::string url; - std::string md5; - uint64_t size; - }; - std::unordered_map> - remote_files_map; - for (const auto& remote_tablet_snapshot : remote_tablet_snapshots) { - const auto& remote_path = remote_tablet_snapshot.remote_snapshot_path; - auto& remote_files = remote_files_map[remote_path]; + existing_files.clear(); + + // Step 3: Validate remote tablet snapshot paths && remote files map + // key is remote snapshot paths, value is filelist + // get all these use http download action + // http://172.16.0.14:6781/api/_tablet/_download?token=e804dd27-86da-4072-af58-70724075d2a4&file=/home/ubuntu/doris_master/output/be/storage/snapshot/20230410102306.9.180//2774718/217609978/2774718.hdr + struct RemoteFileStat { + std::string url; + std::string md5; + uint64_t size; + }; + std::unordered_map remote_files; const auto& token = remote_tablet_snapshot.remote_token; const auto& remote_be_addr = remote_tablet_snapshot.remote_be_addr; @@ -520,19 +522,11 @@ Status SnapshotLoader::remote_http_download( remote_files[filename] = RemoteFileStat {remote_file_url, file_md5, file_size}; } - } - // Step 4: Compare local and remote files && get all need download files - for (const auto& remote_tablet_snapshot : remote_tablet_snapshots) { + // Step 4: Compare local and remote files && get all need download files RETURN_IF_ERROR(_report_every(10, &report_counter, finished_num, total_num, TTaskType::type::DOWNLOAD)); - const auto& remote_path = remote_tablet_snapshot.remote_snapshot_path; - const auto& local_path = remote_tablet_snapshot.local_snapshot_path; - auto& remote_files = remote_files_map[remote_path]; - auto& local_files = local_files_map[local_path]; - auto remote_tablet_id = remote_tablet_snapshot.remote_tablet_id; - // get all need download files std::vector need_download_files; for (const auto& [remote_file, remote_filestat] : remote_files) { @@ -661,6 +655,7 @@ Status SnapshotLoader::remote_http_download( if (total_time_ms > 0) { copy_rate = total_file_size / ((double)total_time_ms) / 1000; } + auto remote_tablet_id = remote_tablet_snapshot.remote_tablet_id; LOG(INFO) << fmt::format( "succeed to copy remote tablet {} to local tablet {}, total file size: {} B, cost: " "{} ms, rate: {} MB/s", @@ -710,6 +705,9 @@ Status SnapshotLoader::remote_http_download( // MUST hold tablet's header lock, push lock, cumulative lock and base compaction lock Status SnapshotLoader::move(const std::string& snapshot_path, TabletSharedPtr tablet, bool overwrite) { + // Take a lock to protect the local snapshot path. + auto local_snapshot_guard = LocalSnapshotLock::instance().acquire(snapshot_path); + auto tablet_path = tablet->tablet_path(); auto store_path = tablet->data_dir()->path(); LOG(INFO) << "begin to move snapshot files. from: " << snapshot_path << ", to: " << tablet_path