Skip to content

Commit

Permalink
blob writing error processing for portion-write-mode (#12029)
Browse files Browse the repository at this point in the history
  • Loading branch information
ivanmorozov333 authored Nov 27, 2024
1 parent ae1af5f commit e92e224
Show file tree
Hide file tree
Showing 14 changed files with 155 additions and 33 deletions.
23 changes: 23 additions & 0 deletions ydb/core/kqp/ut/olap/write_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,29 @@
namespace NKikimr::NKqp {

Y_UNIT_TEST_SUITE(KqpOlapWrite) {
Y_UNIT_TEST(WriteFails) {
auto csController = NKikimr::NYDBTest::TControllers::RegisterCSControllerGuard<NKikimr::NOlap::TWaitCompactionController>();
csController->SetSmallSizeDetector(1000000);
csController->SetIndexWriteControllerEnabled(false);
csController->SetOverridePeriodicWakeupActivationPeriod(TDuration::Seconds(1));
csController->SetOverrideBlobPutResultOnWriteValue(NKikimrProto::EReplyStatus::BLOCKED);
Singleton<NKikimr::NWrappers::NExternalStorage::TFakeExternalStorage>()->ResetWriteCounters();

auto settings = TKikimrSettings().SetWithSampleTables(false);
TKikimrRunner kikimr(settings);
kikimr.GetTestServer().GetRuntime()->GetAppData().FeatureFlags.SetEnableImmediateWritingOnBulkUpsert(true);
kikimr.GetTestServer().GetRuntime()->GetAppData().FeatureFlags.SetEnableWritePortionsOnInsert(true);
TLocalHelper(kikimr).CreateTestOlapTable();
Tests::NCommon::TLoggerInit(kikimr)
.SetComponents({ NKikimrServices::TX_COLUMNSHARD }, "CS")
.SetPriority(NActors::NLog::PRI_DEBUG)
.Initialize();
{
auto batch = TLocalHelper(kikimr).TestArrowBatch(30000, 1000000, 11000);
TLocalHelper(kikimr).SendDataViaActorSystem("/Root/olapStore/olapTable", batch, Ydb::StatusIds::INTERNAL_ERROR);
}
}

Y_UNIT_TEST(TierDraftsGC) {
auto csController = NKikimr::NYDBTest::TControllers::RegisterCSControllerGuard<NKikimr::NOlap::TWaitCompactionController>();
csController->SetSmallSizeDetector(1000000);
Expand Down
2 changes: 2 additions & 0 deletions ydb/core/protos/counters_columnshard.proto
Original file line number Diff line number Diff line change
Expand Up @@ -203,4 +203,6 @@ enum ETxTypes {
TXTYPE_START_INTERNAL_SCAN = 36 [(TxTypeOpts) = {Name: "TxStartInternalScan"}];
TXTYPE_DATA_SHARING_START_SOURCE_CURSOR = 37 [(TxTypeOpts) = {Name: "TxDataSharingStartSourceCursor"}];
TXTYPE_ASK_PORTION_METADATA = 38 [(TxTypeOpts) = {Name: "TxAskPortionMetadata"}];
TXTYPE_WRITE_PORTIONS_FINISHED = 39 [(TxTypeOpts) = {Name: "TxWritePortionsFinished"}];
TXTYPE_WRITE_PORTIONS_FAILED = 40 [(TxTypeOpts) = {Name: "TxWritePortionsFailed"}];
}
2 changes: 1 addition & 1 deletion ydb/core/testlib/cs_helper.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ class THelperSchemaless : public NCommon::THelper {
void CreateTestOlapStore(TActorId sender, TString scheme);
void CreateTestOlapTable(TActorId sender, TString storeOrDirName, TString scheme);
void SendDataViaActorSystem(TString testTable, ui64 pathIdBegin, ui64 tsBegin, size_t rowCount, const ui32 tsStepUs = 1) const;
void SendDataViaActorSystem(TString testTable, std::shared_ptr<arrow::RecordBatch> batch, const Ydb::StatusIds_StatusCode& expectedStatus = Ydb::StatusIds::SUCCESS) const;
void SendDataViaActorSystem(TString testTable, std::shared_ptr<arrow::RecordBatch> batch, const Ydb::StatusIds_StatusCode& expectedStatus = Ydb::StatusIds::SUCCESS) const;

virtual std::shared_ptr<arrow::RecordBatch> TestArrowBatch(ui64 pathIdBegin, ui64 tsBegin, size_t rowCount, const ui32 tsStepUs = 1) const = 0;
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,27 +105,52 @@ void TTxBlobsWritingFinished::DoComplete(const TActorContext& ctx) {
AFL_VERIFY(CommitSnapshot);
Self->OperationsManager->AddTemporaryTxLink(op->GetLockId());
Self->OperationsManager->CommitTransactionOnComplete(*Self, op->GetLockId(), *CommitSnapshot);
Self->Counters.GetTabletCounters()->IncCounter(COUNTER_IMMEDIATE_TX_COMPLETED);
}
Self->Counters.GetCSCounters().OnWriteTxComplete(now - writeMeta.GetWriteStartInstant());
Self->Counters.GetCSCounters().OnSuccessWriteResponse();
}
Self->SetupCompaction(pathIds);
Self->Counters.GetTabletCounters()->IncCounter(COUNTER_IMMEDIATE_TX_COMPLETED);
}

TTxBlobsWritingFinished::TTxBlobsWritingFinished(TColumnShard* self, const NKikimrProto::EReplyStatus writeStatus,
const std::shared_ptr<NOlap::IBlobsWritingAction>& writingActions, std::vector<TInsertedPortions>&& packs,
const std::vector<TFailedWrite>& fails)
const std::vector<TNoDataWrite>& noDataWrites)
: TBase(self, "TTxBlobsWritingFinished")
, PutBlobResult(writeStatus)
, Packs(std::move(packs))
, WritingActions(writingActions) {
Y_UNUSED(PutBlobResult);
for (auto&& i : fails) {
for (auto&& i : noDataWrites) {
auto ev = NEvents::TDataEvents::TEvWriteResult::BuildCompleted(Self->TabletID());
auto op = Self->GetOperationsManager().GetOperationVerified((TOperationWriteId)i.GetWriteMeta().GetWriteId());
Results.emplace_back(std::move(ev), i.GetWriteMeta().GetSource(), op->GetCookie());
}
}

bool TTxBlobsWritingFailed::DoExecute(TTransactionContext& txc, const TActorContext& ctx) {
for (auto&& pack : Packs) {
const auto& writeMeta = pack.GetWriteMeta();
AFL_VERIFY(!writeMeta.HasLongTxId());
auto op = Self->GetOperationsManager().GetOperationVerified((TOperationWriteId)writeMeta.GetWriteId());
Self->OperationsManager->AddTemporaryTxLink(op->GetLockId());
Self->OperationsManager->AbortTransactionOnExecute(*Self, op->GetLockId(), txc);

auto ev = NEvents::TDataEvents::TEvWriteResult::BuildError(Self->TabletID(), op->GetLockId(),
NKikimrDataEvents::TEvWriteResult::STATUS_INTERNAL_ERROR, "cannot write blob: " + ::ToString(PutBlobResult));
Results.emplace_back(std::move(ev), writeMeta.GetSource(), op->GetCookie());
}
return true;
}

void TTxBlobsWritingFailed::DoComplete(const TActorContext& ctx) {
for (auto&& i : Results) {
i.DoSendReply(ctx);
Self->Counters.GetCSCounters().OnFailedWriteResponse(EWriteFailReason::PutBlob);
}
for (auto&& pack : Packs) {
const auto& writeMeta = pack.GetWriteMeta();
auto op = Self->GetOperationsManager().GetOperationVerified((TOperationWriteId)writeMeta.GetWriteId());
Self->OperationsManager->AbortTransactionOnComplete(*Self, op->GetLockId());
}
}

} // namespace NKikimr::NColumnShard
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ class TColumnShard;
class TTxBlobsWritingFinished: public NOlap::NDataSharing::TExtendedTransactionBase<TColumnShard> {
private:
using TBase = NOlap::NDataSharing::TExtendedTransactionBase<TColumnShard>;
const NKikimrProto::EReplyStatus PutBlobResult;
std::vector<TInsertedPortions> Packs;
const std::shared_ptr<NOlap::IBlobsWritingAction> WritingActions;
std::optional<NOlap::TSnapshot> CommitSnapshot;
Expand Down Expand Up @@ -43,12 +42,52 @@ class TTxBlobsWritingFinished: public NOlap::NDataSharing::TExtendedTransactionB
public:
TTxBlobsWritingFinished(TColumnShard* self, const NKikimrProto::EReplyStatus writeStatus,
const std::shared_ptr<NOlap::IBlobsWritingAction>& writingActions, std::vector<TInsertedPortions>&& packs,
const std::vector<TFailedWrite>& fails);
const std::vector<TNoDataWrite>& noDataWrites);

virtual bool DoExecute(TTransactionContext& txc, const TActorContext& ctx) override;
virtual void DoComplete(const TActorContext& ctx) override;
TTxType GetTxType() const override {
return TXTYPE_WRITE_PORTIONS_FINISHED;
}
};

class TTxBlobsWritingFailed: public NOlap::NDataSharing::TExtendedTransactionBase<TColumnShard> {
private:
using TBase = NOlap::NDataSharing::TExtendedTransactionBase<TColumnShard>;
const NKikimrProto::EReplyStatus PutBlobResult;
std::vector<TInsertedPortions> Packs;

class TReplyInfo {
private:
std::unique_ptr<NActors::IEventBase> Event;
TActorId DestinationForReply;
const ui64 Cookie;

public:
TReplyInfo(std::unique_ptr<NActors::IEventBase>&& ev, const TActorId& destinationForReply, const ui64 cookie)
: Event(std::move(ev))
, DestinationForReply(destinationForReply)
, Cookie(cookie) {
}

void DoSendReply(const TActorContext& ctx) {
ctx.Send(DestinationForReply, Event.release(), 0, Cookie);
}
};

std::vector<TReplyInfo> Results;

public:
TTxBlobsWritingFailed(TColumnShard* self, const NKikimrProto::EReplyStatus writeStatus, std::vector<TInsertedPortions>&& packs)
: TBase(self)
, PutBlobResult(writeStatus)
, Packs(std::move(packs)) {
}

virtual bool DoExecute(TTransactionContext& txc, const TActorContext& ctx) override;
virtual void DoComplete(const TActorContext& ctx) override;
TTxType GetTxType() const override {
return TXTYPE_WRITE;
return TXTYPE_WRITE_PORTIONS_FAILED;
}
};

Expand Down
36 changes: 25 additions & 11 deletions ydb/core/tx/columnshard/columnshard__write.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,20 +91,34 @@ void TColumnShard::Handle(NPrivateEvents::NWrite::TEvWritePortionResult::TPtr& e
TMemoryProfileGuard mpg("TEvWritePortionResult");
NActors::TLogContextGuard gLogging =
NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD)("tablet_id", TabletID())("event", "TEvWritePortionResult");
AFL_VERIFY(ev->Get()->GetWriteStatus() == NKikimrProto::OK);
std::vector<TInsertedPortions> writtenPacks = ev->Get()->DetachInsertedPacks();
std::vector<TFailedWrite> fails = ev->Get()->DetachFails();
const TMonotonic now = TMonotonic::Now();
for (auto&& i : writtenPacks) {
Counters.OnWritePutBlobsSuccess(now - i.GetWriteMeta().GetWriteStartInstant(), i.GetRecordsCount());
std::vector<TNoDataWrite> noDataWrites = ev->Get()->DetachNoDataWrites();
for (auto&& i : noDataWrites) {
Counters.GetWritesMonitor()->OnFinishWrite(i.GetDataSize(), 1);
}
for (auto&& i : fails) {
Counters.GetWritesMonitor()->OnFinishWrite(i.GetDataSize(), 1);
if (ev->Get()->GetWriteStatus() == NKikimrProto::OK) {
std::vector<TInsertedPortions> writtenPacks = ev->Get()->DetachInsertedPacks();
const TMonotonic now = TMonotonic::Now();
for (auto&& i : writtenPacks) {
Counters.OnWritePutBlobsSuccess(now - i.GetWriteMeta().GetWriteStartInstant(), i.GetRecordsCount());
Counters.GetWritesMonitor()->OnFinishWrite(i.GetDataSize(), 1);
}
Execute(new TTxBlobsWritingFinished(
this, ev->Get()->GetWriteStatus(), ev->Get()->GetWriteAction(), std::move(writtenPacks), std::move(noDataWrites)),
ctx);
} else {
if (noDataWrites.size()) {
Execute(new TTxBlobsWritingFinished(this, NKikimrProto::OK, ev->Get()->GetWriteAction(), {}, std::move(noDataWrites)), ctx);
}

std::vector<TInsertedPortions> writtenPacks = ev->Get()->DetachInsertedPacks();
const TMonotonic now = TMonotonic::Now();
for (auto&& i : writtenPacks) {
Counters.OnWritePutBlobsFailed(now - i.GetWriteMeta().GetWriteStartInstant(), i.GetRecordsCount());
Counters.GetCSCounters().OnWritePutBlobsFail(now - i.GetWriteMeta().GetWriteStartInstant());
Counters.GetWritesMonitor()->OnFinishWrite(i.GetDataSize(), 1);
}
Execute(new TTxBlobsWritingFailed(this, ev->Get()->GetWriteStatus(), std::move(writtenPacks)), ctx);
}
Execute(
new TTxBlobsWritingFinished(this, ev->Get()->GetWriteStatus(), ev->Get()->GetWriteAction(), std::move(writtenPacks), std::move(fails)),
ctx);
}

void TColumnShard::Handle(TEvPrivate::TEvWriteBlobsResult::TPtr& ev, const TActorContext& ctx) {
Expand Down
2 changes: 2 additions & 0 deletions ydb/core/tx/columnshard/columnshard_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ class TTxRemoveSharedBlobs;
class TOperationsManager;
class TWaitEraseTablesTxSubscriber;
class TTxBlobsWritingFinished;
class TTxBlobsWritingFailed;

namespace NLoading {
class TInsertTableInitializer;
Expand Down Expand Up @@ -165,6 +166,7 @@ class TColumnShard: public TActor<TColumnShard>, public NTabletFlatExecutor::TTa
friend class TTxPlanStep;
friend class TTxWrite;
friend class TTxBlobsWritingFinished;
friend class TTxBlobsWritingFailed;
friend class TTxReadBase;
friend class TTxRead;
friend class TTxWriteIndex;
Expand Down
5 changes: 5 additions & 0 deletions ydb/core/tx/columnshard/counters/counters_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,11 @@ class TCountersManager {
TabletCounters->OnWritePutBlobsSuccess(rowsWritten);
CSCounters.OnWritePutBlobsSuccess(d);
}

void OnWritePutBlobsFailed(const TDuration d, const ui64 /*rowsWritten*/) const {
TabletCounters->OnWriteFailure();
CSCounters.OnWritePutBlobsFail(d);
}
};

} // namespace NKikimr::NColumnShard
4 changes: 4 additions & 0 deletions ydb/core/tx/columnshard/hooks/abstract/abstract.h
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,10 @@ class ICSController {
const std::set<NOlap::TSnapshot>& /*snapshotsToSave*/, const std::set<NOlap::TSnapshot>& /*snapshotsToRemove*/) {
}

virtual NKikimrProto::EReplyStatus OverrideBlobPutResultOnWrite(const NKikimrProto::EReplyStatus originalStatus) const {
return originalStatus;
}

ui64 GetMemoryLimitScanPortion() const {
return DoGetMemoryLimitScanPortion(GetConfig().GetMemoryLimitScanPortion());
}
Expand Down
6 changes: 6 additions & 0 deletions ydb/core/tx/columnshard/hooks/testing/controller.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ class TController: public TReadOnlyController {
YDB_ACCESSOR_DEF(std::optional<TDuration>, OverrideTasksActualizationLag);
YDB_ACCESSOR_DEF(std::optional<TDuration>, OverrideReadTimeoutClean);
YDB_ACCESSOR(std::optional<ui64>, OverrideMemoryLimitForPortionReading, 100);
YDB_ACCESSOR_DEF(std::optional<NKikimrProto::EReplyStatus>, OverrideBlobPutResultOnWriteValue);

EOptimizerCompactionWeightControl CompactionControl = EOptimizerCompactionWeightControl::Force;

YDB_ACCESSOR(std::optional<ui64>, OverrideReduceMemoryIntervalLimit, 1024);
Expand Down Expand Up @@ -202,6 +204,10 @@ class TController: public TReadOnlyController {
}

public:
virtual NKikimrProto::EReplyStatus OverrideBlobPutResultOnWrite(const NKikimrProto::EReplyStatus originalStatus) const override {
return OverrideBlobPutResultOnWriteValue.value_or(originalStatus);
}

const TAtomicCounter& GetIndexWriteControllerBrokeCount() const {
return IndexWriteControllerBrokeCount;
}
Expand Down
14 changes: 7 additions & 7 deletions ydb/core/tx/columnshard/operations/events.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ class TInsertedPortions {
}
};

class TFailedWrite {
class TNoDataWrite {
private:
NEvWrite::TWriteMeta WriteMeta;
YDB_READONLY(ui64, DataSize, 0);
Expand All @@ -73,7 +73,7 @@ class TFailedWrite {
return WriteMeta;
}

TFailedWrite(const NEvWrite::TWriteMeta& writeMeta, const ui64 dataSize)
TNoDataWrite(const NEvWrite::TWriteMeta& writeMeta, const ui64 dataSize)
: WriteMeta(writeMeta)
, DataSize(dataSize) {
AFL_VERIFY(!WriteMeta.HasLongTxId());
Expand All @@ -89,22 +89,22 @@ class TEvWritePortionResult: public TEventLocal<TEvWritePortionResult, TEvPrivat
YDB_READONLY_DEF(NKikimrProto::EReplyStatus, WriteStatus);
YDB_READONLY_DEF(std::shared_ptr<NOlap::IBlobsWritingAction>, WriteAction);
std::vector<TInsertedPortions> InsertedPacks;
std::vector<TFailedWrite> Fails;
std::vector<TNoDataWrite> NoData;

public:
std::vector<TInsertedPortions>&& DetachInsertedPacks() {
return std::move(InsertedPacks);
}
std::vector<TFailedWrite>&& DetachFails() {
return std::move(Fails);
std::vector<TNoDataWrite>&& DetachNoDataWrites() {
return std::move(NoData);
}

TEvWritePortionResult(const NKikimrProto::EReplyStatus writeStatus, const std::shared_ptr<NOlap::IBlobsWritingAction>& writeAction,
std::vector<TInsertedPortions>&& portions, std::vector<TFailedWrite>&& fails)
std::vector<TInsertedPortions>&& portions, std::vector<TNoDataWrite>&& noData)
: WriteStatus(writeStatus)
, WriteAction(writeAction)
, InsertedPacks(portions)
, Fails(fails) {
, NoData(noData) {
}
};

Expand Down
8 changes: 4 additions & 4 deletions ydb/core/tx/columnshard/operations/slice_builder/builder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,14 +76,14 @@ class TPortionWriteController: public NColumnShard::IWriteController,
const ui64 DataSize;
void DoOnReadyResult(const NActors::TActorContext& ctx, const NColumnShard::TBlobPutResult::TPtr& putResult) override {
std::vector<NColumnShard::TInsertedPortion> portions;
std::vector<NColumnShard::TFailedWrite> fails;
std::vector<NColumnShard::TNoDataWrite> noDataWrites;
for (auto&& i : Portions) {
portions.emplace_back(i.ExtractPortion(), i.GetPKBatch());
}
NColumnShard::TInsertedPortions pack(std::move(WriteMeta), std::move(portions), DataSize);
std::vector<NColumnShard::TInsertedPortions> packs = { pack };
auto result = std::make_unique<NColumnShard::NPrivateEvents::NWrite::TEvWritePortionResult>(
putResult->GetPutStatus(), Action, std::move(packs), std::move(fails));
putResult->GetPutStatus(), Action, std::move(packs), std::move(noDataWrites));
ctx.Send(DstActor, result.release());
}
virtual void DoOnStartSending() override {
Expand Down Expand Up @@ -119,9 +119,9 @@ TConclusionStatus TBuildSlicesTask::DoExecute(const std::shared_ptr<ITask>& /*ta
if (WriteData.GetWritePortions()) {
if (OriginalBatch->num_rows() == 0) {
std::vector<NColumnShard::TInsertedPortions> portions;
std::vector<NColumnShard::TFailedWrite> fails = { NColumnShard::TFailedWrite(WriteData.GetWriteMeta(), WriteData.GetSize()) };
std::vector<NColumnShard::TNoDataWrite> noDataWrites = { NColumnShard::TNoDataWrite(WriteData.GetWriteMeta(), WriteData.GetSize()) };
auto result = std::make_unique<NColumnShard::NPrivateEvents::NWrite::TEvWritePortionResult>(
NKikimrProto::EReplyStatus::OK, nullptr, std::move(portions), std::move(fails));
NKikimrProto::EReplyStatus::OK, nullptr, std::move(portions), std::move(noDataWrites));
NActors::TActivationContext::AsActorContext().Send(Context.GetTabletActorId(), result.release());
} else {
auto batches = NArrow::NMerger::TRWSortableBatchPosition::SplitByBordersInIntervalPositions(OriginalBatch,
Expand Down
4 changes: 2 additions & 2 deletions ydb/core/tx/columnshard/operations/write.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ void TWriteOperation::FromProto(const NKikimrTxColumnShard::TInternalOperationDa
}

void TWriteOperation::AbortOnExecute(TColumnShard& owner, NTabletFlatExecutor::TTransactionContext& txc) const {
Y_ABORT_UNLESS(Status == EOperationStatus::Prepared);
Y_ABORT_UNLESS(Status != EOperationStatus::Draft);

TBlobGroupSelector dsGroupSelector(owner.Info());
NOlap::TDbWrapper dbTable(txc.DB, &dsGroupSelector);
Expand All @@ -144,7 +144,7 @@ void TWriteOperation::AbortOnExecute(TColumnShard& owner, NTabletFlatExecutor::T
}

void TWriteOperation::AbortOnComplete(TColumnShard& owner) const {
Y_ABORT_UNLESS(Status == EOperationStatus::Prepared);
Y_ABORT_UNLESS(Status != EOperationStatus::Draft);
if (WritePortions) {
for (auto&& i : InsertWriteIds) {
owner.MutableIndexAs<NOlap::TColumnEngineForLogs>().MutableGranuleVerified(PathId).AbortPortionOnComplete(
Expand Down
2 changes: 2 additions & 0 deletions ydb/core/tx/columnshard/write_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ class TWriteActor: public TActorBootstrapped<TWriteActor>, public TMonitoringObj
YellowStopChannels.insert(msg->Id.Channel());
}

status = NYDBTest::TControllers::GetColumnShardController()->OverrideBlobPutResultOnWrite(status);

if (status != NKikimrProto::OK) {
ACFL_ERROR("event", "TEvPutResult")("blob_id", msg->Id.ToString())("status", status)("error", msg->ErrorReason);
WriteController->Abort("cannot write blob " + msg->Id.ToString() + ", status: " + ::ToString(status) + ". reason: " + msg->ErrorReason);
Expand Down

0 comments on commit e92e224

Please sign in to comment.