Skip to content

Commit

Permalink
handle s3 errors in GC (ydb-platform#13890)
Browse files Browse the repository at this point in the history
  • Loading branch information
swalrus1 authored Feb 3, 2025
1 parent 74ff898 commit a0f48cc
Show file tree
Hide file tree
Showing 12 changed files with 217 additions and 48 deletions.
2 changes: 1 addition & 1 deletion ydb/core/kqp/ut/common/columnshard.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ namespace NKqp {
UPSERT OBJECT `secretKey` (TYPE SECRET) WITH (value = `fakeSecret`);
CREATE EXTERNAL DATA SOURCE `)" + tierName + R"(` WITH (
SOURCE_TYPE="ObjectStorage",
LOCATION="http://fake.fake/fake",
LOCATION="http://fake.fake/olap-)" + tierName + R"(",
AUTH_METHOD="AWS",
AWS_ACCESS_KEY_ID_SECRET_NAME="accessKey",
AWS_SECRET_ACCESS_KEY_SECRET_NAME="secretKey",
Expand Down
54 changes: 47 additions & 7 deletions ydb/core/kqp/ut/olap/tiering_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,16 +58,18 @@ class TTieringTestHelper {
}
}

void CheckAllDataInTier(const TString& tierName) {
void CheckAllDataInTier(const TString& tierName, const bool onlyActive=true) {
NYdb::NTable::TTableClient tableClient = TestHelper->GetKikimr().GetTableClient();

auto selectQuery = TString(R"(

auto selectQuery = TStringBuilder();
selectQuery << R"(
SELECT
TierName, SUM(ColumnRawBytes) AS RawBytes, SUM(Rows) AS Rows
FROM `/Root/olapStore/olapTable/.sys/primary_index_portion_stats`
WHERE Activity == 1
GROUP BY TierName
)");
FROM `/Root/olapStore/olapTable/.sys/primary_index_portion_stats`)";
if (onlyActive) {
selectQuery << " WHERE Activity == 1";
}
selectQuery << " GROUP BY TierName";

auto rows = ExecuteScanQuery(tableClient, selectQuery);
UNIT_ASSERT_VALUES_EQUAL(rows.size(), 1);
Expand Down Expand Up @@ -304,6 +306,44 @@ Y_UNIT_TEST_SUITE(KqpOlapTiering) {
UNIT_ASSERT_VALUES_EQUAL(rows.size(), 0);
}
}

Y_UNIT_TEST(TieringGC) {
TTieringTestHelper tieringHelper;
auto& csController = tieringHelper.GetCsController();
csController->SetOverrideMaxReadStaleness(TDuration::Seconds(1));
csController->SetOverridePeriodicWakeupActivationPeriod(TDuration::Seconds(1));
auto& olapHelper = tieringHelper.GetOlapHelper();
auto& testHelper = tieringHelper.GetTestHelper();

olapHelper.CreateTestOlapTable();
testHelper.CreateTier("tier1");
tieringHelper.WriteSampleData();

testHelper.SetTiering("/Root/olapStore/olapTable", "/Root/tier1", "timestamp");
csController->WaitCompactions(TDuration::Seconds(5));
csController->WaitActualization(TDuration::Seconds(5));
tieringHelper.CheckAllDataInTier("/Root/tier1", false);
UNIT_ASSERT_GT(Singleton<NKikimr::NWrappers::NExternalStorage::TFakeExternalStorage>()->GetBucket("olap-tier1").GetSize(), 0);

csController->DisableBackground(NYDBTest::ICSController::EBackground::GC);
testHelper.ResetTiering("/Root/olapStore/olapTable");
csController->WaitActualization(TDuration::Seconds(5));

tieringHelper.CheckAllDataInTier("__DEFAULT", false);
UNIT_ASSERT_GT(Singleton<NKikimr::NWrappers::NExternalStorage::TFakeExternalStorage>()->GetBucket("olap-tier1").GetSize(), 0);

csController->EnableBackground(NYDBTest::ICSController::EBackground::GC);
csController->SetExternalStorageUnavailable(true);
testHelper.ResetTiering("/Root/olapStore/olapTable");
csController->WaitCleaning(TDuration::Seconds(5));
UNIT_ASSERT_GT(Singleton<NKikimr::NWrappers::NExternalStorage::TFakeExternalStorage>()->GetBucket("olap-tier1").GetSize(), 0);

csController->SetExternalStorageUnavailable(false);
testHelper.ResetTiering("/Root/olapStore/olapTable");
csController->WaitCondition(TDuration::Seconds(60), []() {
return Singleton<NKikimr::NWrappers::NExternalStorage::TFakeExternalStorage>()->GetBucket("olap-tier1").GetSize() == 0;
});
}
}

} // namespace NKikimr::NKqp
3 changes: 3 additions & 0 deletions ydb/core/tx/columnshard/blobs_action/tier/common.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
#include "common.h"

namespace NKikimr::NOlap::NBlobOperations::NTier {}
26 changes: 26 additions & 0 deletions ydb/core/tx/columnshard/blobs_action/tier/common.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
#pragma once

#include <ydb/core/wrappers/abstract.h>

#include <ydb/library/actors/core/log.h>

namespace NKikimr::NOlap::NBlobOperations::NTier {

class TExternalStorageOperatorHolder {
private:
NWrappers::NExternalStorage::IExternalStorageOperator::TPtr StorageOperator = nullptr;
TAdaptiveLock Mutex;

public:
void Emplace(const NWrappers::NExternalStorage::IExternalStorageOperator::TPtr& storageOperator) {
TGuard g(Mutex);
StorageOperator = storageOperator;
}

NWrappers::NExternalStorage::IExternalStorageOperator::TPtr Get() const {
TGuard g(Mutex);
return StorageOperator;
}
};

} // namespace NKikimr::NOlap::NBlobOperations::NTier
9 changes: 7 additions & 2 deletions ydb/core/tx/columnshard/blobs_action/tier/gc.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
#include <ydb/core/tx/columnshard/blobs_action/abstract/gc.h>
#include <ydb/core/tx/columnshard/blobs_action/counters/remove_gc.h>
#include <ydb/core/tx/columnshard/blob.h>
#include <ydb/core/tx/columnshard/blobs_action/tier/common.h>
#include <ydb/core/wrappers/abstract.h>
#include <ydb/library/accessor/accessor.h>

Expand All @@ -12,7 +13,7 @@ class TGCTask: public IBlobsGCAction {
using TBase = IBlobsGCAction;
private:
YDB_READONLY_DEF(std::deque<TUnifiedBlobId>, DraftBlobIds);
YDB_READONLY_DEF(NWrappers::NExternalStorage::IExternalStorageOperator::TPtr, ExternalStorageOperator);
std::shared_ptr<TExternalStorageOperatorHolder> ExternalStorageOperator;
protected:
virtual void DoOnExecuteTxAfterCleaning(NColumnShard::TColumnShard& self, TBlobManagerDb& dbBlobs) override;
virtual bool DoOnCompleteTxAfterCleaning(NColumnShard::TColumnShard& self, const std::shared_ptr<IBlobsGCAction>& taskAction) override;
Expand All @@ -27,7 +28,7 @@ class TGCTask: public IBlobsGCAction {
return DraftBlobIds.empty();
}
public:
TGCTask(const TString& storageId, std::deque<TUnifiedBlobId>&& draftBlobIds, const NWrappers::NExternalStorage::IExternalStorageOperator::TPtr& externalStorageOperator,
TGCTask(const TString& storageId, std::deque<TUnifiedBlobId>&& draftBlobIds, const std::shared_ptr<TExternalStorageOperatorHolder>& externalStorageOperator,
TBlobsCategories&& blobsToRemove, const std::shared_ptr<TRemoveGCCounters>& counters)
: TBase(storageId, std::move(blobsToRemove), counters)
, DraftBlobIds(std::move(draftBlobIds))
Expand All @@ -37,6 +38,10 @@ class TGCTask: public IBlobsGCAction {
Counters->OnRequest(i.BlobSize());
}
}

NWrappers::NExternalStorage::IExternalStorageOperator::TPtr GetExternalStorageOperator() const {
return ExternalStorageOperator->Get();
}
};

}
61 changes: 54 additions & 7 deletions ydb/core/tx/columnshard/blobs_action/tier/gc_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,33 @@
namespace NKikimr::NOlap::NBlobOperations::NTier {

void TGarbageCollectionActor::Handle(NWrappers::NExternalStorage::TEvDeleteObjectResponse::TPtr& ev) {
AFL_VERIFY(ev->Get()->Key);
const TString& key = *ev->Get()->Key;

if (!ev->Get()->IsSuccess()) {
AFL_CRIT(NKikimrServices::TX_COLUMNSHARD_BLOBS_TIER)("actor", "TGarbageCollectionActor")("event", "s3_error")("storage_id",
GCTask->GetStorageId())("message", ev->Get()->GetError().GetMessage())("exception", ev->Get()->GetError().GetExceptionName());
const auto& error = ev->Get()->GetError();

bool isRemoved = false;
switch (error.GetErrorType()) {
case Aws::S3::S3Errors::NO_SUCH_BUCKET:
case Aws::S3::S3Errors::NO_SUCH_KEY:
isRemoved = true;
break;
default:
break;
}

if (isRemoved) {
// Do nothing
} else {
auto delay = NextRetryDelay(error, key).value_or(TDuration::Seconds(30));
AFL_WARN(NKikimrServices::TX_COLUMNSHARD_BLOBS_TIER)("actor", "TGarbageCollectionActor")("event", "error")(
"exception", error.GetExceptionName())("message", error.GetMessage())("key", key);
Schedule(delay, new NWrappers::NExternalStorage::TEvDeleteObjectRequest(Aws::S3::Model::DeleteObjectRequest().WithKey(key)));
return;
}
}

TLogoBlobID logoBlobId;
TString errorMessage;
Y_ABORT_UNLESS(ev->Get()->Key);
Expand All @@ -16,6 +39,11 @@ void TGarbageCollectionActor::Handle(NWrappers::NExternalStorage::TEvDeleteObjec
CheckFinished();
}

void TGarbageCollectionActor::Handle(NWrappers::NExternalStorage::TEvDeleteObjectRequest::TPtr& ev) {
AFL_VERIFY(ev->Get()->Request.KeyHasBeenSet());
StartDeletingObject(TString(ev->Get()->Request.GetKey()));
}

void TGarbageCollectionActor::Bootstrap(const TActorContext& ctx) {
for (auto i = GCTask->GetBlobsToRemove().GetDirect().GetIterator(); i.IsValid(); ++i) {
BlobIdsToRemove.emplace(i.GetBlobId().GetLogoBlobId());
Expand All @@ -25,11 +53,7 @@ void TGarbageCollectionActor::Bootstrap(const TActorContext& ctx) {
BlobIdsToRemove.emplace(i.GetLogoBlobId());
}
for (auto&& i : BlobIdsToRemove) {
auto awsRequest = Aws::S3::Model::DeleteObjectRequest().WithKey(i.ToString());
auto request = std::make_unique<NWrappers::NExternalStorage::TEvDeleteObjectRequest>(awsRequest);
auto hRequest = std::make_unique<IEventHandle>(NActors::TActorId(), TActorContext::AsActorContext().SelfID, request.release());
TAutoPtr<TEventHandle<NWrappers::NExternalStorage::TEvDeleteObjectRequest>> evPtr((TEventHandle<NWrappers::NExternalStorage::TEvDeleteObjectRequest>*)hRequest.release());
GCTask->GetExternalStorageOperator()->Execute(evPtr);
StartDeletingObject(i.ToString());
}
TBase::Bootstrap(ctx);
Become(&TGarbageCollectionActor::StateWork);
Expand All @@ -43,4 +67,27 @@ void TGarbageCollectionActor::CheckFinished() {
}
}

void TGarbageCollectionActor::StartDeletingObject(const TString& key) const {
auto awsRequest = Aws::S3::Model::DeleteObjectRequest().WithKey(key);
auto request = std::make_unique<NWrappers::NExternalStorage::TEvDeleteObjectRequest>(awsRequest);
auto hRequest = std::make_unique<IEventHandle>(NActors::TActorId(), TActorContext::AsActorContext().SelfID, request.release());
TAutoPtr<TEventHandle<NWrappers::NExternalStorage::TEvDeleteObjectRequest>> evPtr(
(TEventHandle<NWrappers::NExternalStorage::TEvDeleteObjectRequest>*)hRequest.release());
GCTask->GetExternalStorageOperator()->Execute(evPtr);
}

std::optional<TDuration> TGarbageCollectionActor::NextRetryDelay(const Aws::S3::S3Error& reason, const TString& key) {
if (!reason.ShouldRetry()) {
return std::nullopt;
}
auto* findState = RetryStateByKey.FindPtr(key);
if (!findState) {
findState = &RetryStateByKey.emplace(key, RetryPolicy->CreateRetryState()).first->second;
}
if (auto delay = (*findState)->GetNextRetryDelay()) {
return *delay;
}
return std::nullopt;
}

}
18 changes: 16 additions & 2 deletions ydb/core/tx/columnshard/blobs_action/tier/gc_actor.h
Original file line number Diff line number Diff line change
@@ -1,26 +1,39 @@
#pragma once
#include "gc.h"

#include <ydb/core/tx/columnshard/blobs_action/abstract/gc_actor.h>
#include <ydb/core/base/blobstorage.h>
#include <ydb/core/tx/columnshard/blob_cache.h>
#include <ydb/core/tx/columnshard/blobs_action/abstract/gc_actor.h>
#include <ydb/core/tx/columnshard/columnshard_private_events.h>
#include <ydb/core/base/blobstorage.h>

#include <library/cpp/retry/retry_policy.h>

namespace NKikimr::NOlap::NBlobOperations::NTier {

class TGarbageCollectionActor: public TSharedBlobsCollectionActor<TGarbageCollectionActor> {
private:
using TBase = TSharedBlobsCollectionActor<TGarbageCollectionActor>;
using IRetryPolicy = IRetryPolicy<>;

const NActors::TActorId TabletActorId;
std::shared_ptr<TGCTask> GCTask;

IRetryPolicy::TPtr RetryPolicy = IRetryPolicy::GetExponentialBackoffPolicy([]() {
return ERetryErrorClass::ShortRetry;
});
THashMap<TString, IRetryPolicy::IRetryState::TPtr> RetryStateByKey;

THashSet<TLogoBlobID> BlobIdsToRemove;
void Handle(NWrappers::NExternalStorage::TEvDeleteObjectResponse::TPtr& ev);
void Handle(NWrappers::NExternalStorage::TEvDeleteObjectRequest::TPtr& ev);
void CheckFinished();
void StartDeletingObject(const TString& key) const;
std::optional<TDuration> NextRetryDelay(const Aws::S3::S3Error& reason, const TString& key);

virtual void DoOnSharedRemovingFinished() override {
CheckFinished();
}

public:
TGarbageCollectionActor(const std::shared_ptr<TGCTask>& task, const NActors::TActorId& tabletActorId, const TTabletId selfTabletId)
: TBase(task->GetStorageId(), selfTabletId, task->GetBlobsToRemove().GetBorrowed(), task)
Expand All @@ -33,6 +46,7 @@ class TGarbageCollectionActor: public TSharedBlobsCollectionActor<TGarbageCollec
STFUNC(StateWork) {
switch (ev->GetTypeRewrite()) {
hFunc(NWrappers::NExternalStorage::TEvDeleteObjectResponse, Handle);
hFunc(NWrappers::NExternalStorage::TEvDeleteObjectRequest, Handle);
default:
TBase::StateWork(ev);
}
Expand Down
57 changes: 31 additions & 26 deletions ydb/core/tx/columnshard/blobs_action/tier/storage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ namespace NKikimr::NOlap::NBlobOperations::NTier {

NWrappers::NExternalStorage::IExternalStorageOperator::TPtr TOperator::GetCurrentOperator() const {
TGuard<TSpinLock> changeLock(ChangeOperatorLock);
return ExternalStorageOperator;
return ExternalStorageOperator->Get();
}

std::shared_ptr<IBlobsDeclareRemovingAction> TOperator::DoStartDeclareRemovingAction(const std::shared_ptr<NBlobOperations::TRemoveDeclareCounters>& counters) {
Expand All @@ -41,7 +41,7 @@ std::shared_ptr<IBlobsGCAction> TOperator::DoCreateGCAction(const std::shared_pt
}
categories = GetSharedBlobs()->BuildRemoveCategories(std::move(deleteBlobIds));
}
auto gcTask = std::make_shared<TGCTask>(GetStorageId(), std::move(draftBlobIds), GetCurrentOperator(), std::move(categories), counters);
auto gcTask = std::make_shared<TGCTask>(GetStorageId(), std::move(draftBlobIds), ExternalStorageOperator, std::move(categories), counters);
if (gcTask->IsEmpty()) {
AFL_INFO(NKikimrServices::TX_COLUMNSHARD_BLOBS_TIER)("event", "start_gc_skipped")("reason", "task_empty");
return nullptr;
Expand All @@ -56,55 +56,60 @@ void TOperator::DoStartGCAction(const std::shared_ptr<IBlobsGCAction>& action) c
}

void TOperator::InitNewExternalOperator(const NColumnShard::NTiers::TManager* tierManager) {
NWrappers::NExternalStorage::IExternalStorageOperator::TPtr extStorageOperator;
std::optional<NKikimrSchemeOp::TS3Settings> settings;

if (tierManager && tierManager->IsReady()) {
settings = tierManager->GetS3Settings();
if (auto op = NYDBTest::TControllers::GetColumnShardController()->GetStorageOperatorOverride(GetStorageId())) {
AFL_INFO(NKikimrServices::TX_COLUMNSHARD_BLOBS_TIER)("event", "override_external_operator")("storage", GetStorageId());
DoInitNewExternalOperator(op, std::nullopt);
} else if (tierManager && tierManager->IsReady()) {
const NKikimrSchemeOp::TS3Settings& settings = tierManager->GetS3Settings();
{
TGuard<TSpinLock> changeLock(ChangeOperatorLock);
if (CurrentS3Settings && CurrentS3Settings->SerializeAsString() == settings->SerializeAsString()) {
if (CurrentS3Settings && CurrentS3Settings->SerializeAsString() == settings.SerializeAsString()) {
return;
}
}
auto extStorageConfig = NWrappers::NExternalStorage::IExternalStorageConfig::Construct(*settings);
auto extStorageConfig = NWrappers::NExternalStorage::IExternalStorageConfig::Construct(settings);
AFL_VERIFY(extStorageConfig);
extStorageOperator = extStorageConfig->ConstructStorageOperator(false);
DoInitNewExternalOperator(extStorageConfig->ConstructStorageOperator(false), settings);
} else {
extStorageOperator = std::make_shared<NWrappers::NExternalStorage::TUnavailableExternalStorageOperator>(
NWrappers::NExternalStorage::TUnavailableExternalStorageOperator(
"tier_unavailable", TStringBuilder() << "Tier is not configured: " << GetStorageId()));
DoInitNewExternalOperator(std::make_shared<NWrappers::NExternalStorage::TUnavailableExternalStorageOperator>(
NWrappers::NExternalStorage::TUnavailableExternalStorageOperator(
"tier_unavailable", TStringBuilder() << "Tier is not configured: " << GetStorageId())),
std::nullopt);
}

extStorageOperator->InitReplyAdapter(std::make_shared<NOlap::NBlobOperations::NTier::TRepliesAdapter>(GetStorageId()));
TGuard<TSpinLock> changeLock(ChangeOperatorLock);
CurrentS3Settings = settings;
ExternalStorageOperator = extStorageOperator;
}

void TOperator::InitNewExternalOperator() {
AFL_VERIFY(InitializationConfig);
auto extStorageOperator = InitializationConfig->ConstructStorageOperator(false);
extStorageOperator->InitReplyAdapter(std::make_shared<NOlap::NBlobOperations::NTier::TRepliesAdapter>(GetStorageId()));
TGuard<TSpinLock> changeLock(ChangeOperatorLock);
ExternalStorageOperator = extStorageOperator;
DoInitNewExternalOperator(InitializationConfig->ConstructStorageOperator(false), std::nullopt);
}

void TOperator::DoInitNewExternalOperator(const NWrappers::NExternalStorage::IExternalStorageOperator::TPtr& storageOperator,
const std::optional<NKikimrSchemeOp::TS3Settings>& settings) {
storageOperator->InitReplyAdapter(std::make_shared<NOlap::NBlobOperations::NTier::TRepliesAdapter>(GetStorageId()));
{
TGuard<TSpinLock> changeLock(ChangeOperatorLock);
CurrentS3Settings = settings;
}
ExternalStorageOperator->Emplace(storageOperator);
}

TOperator::TOperator(const TString& storageId, const NColumnShard::TColumnShard& shard, const std::shared_ptr<NDataSharing::TStorageSharedBlobsManager>& storageSharedBlobsManager)
TOperator::TOperator(const TString& storageId, const NColumnShard::TColumnShard& shard,
const std::shared_ptr<NDataSharing::TStorageSharedBlobsManager>& storageSharedBlobsManager)
: TBase(storageId, storageSharedBlobsManager)
, TabletActorId(shard.SelfId())
, Generation(shard.Executor()->Generation())
{
, ExternalStorageOperator(std::make_shared<TExternalStorageOperatorHolder>()) {
InitNewExternalOperator(shard.GetTierManagerPointer(storageId));
}

TOperator::TOperator(const TString& storageId, const TActorId& shardActorId, const std::shared_ptr<NWrappers::IExternalStorageConfig>& storageConfig,
TOperator::TOperator(const TString& storageId, const TActorId& shardActorId,
const std::shared_ptr<NWrappers::IExternalStorageConfig>& storageConfig,
const std::shared_ptr<NDataSharing::TStorageSharedBlobsManager>& storageSharedBlobsManager, const ui64 generation)
: TBase(storageId, storageSharedBlobsManager)
, TabletActorId(shardActorId)
, Generation(generation)
, InitializationConfig(storageConfig)
{
, ExternalStorageOperator(std::make_shared<TExternalStorageOperatorHolder>()) {
InitNewExternalOperator();
}

Expand Down
Loading

0 comments on commit a0f48cc

Please sign in to comment.