diff --git a/ydb/core/kqp/ut/olap/write_ut.cpp b/ydb/core/kqp/ut/olap/write_ut.cpp index 88b349912988..0784cfa235af 100644 --- a/ydb/core/kqp/ut/olap/write_ut.cpp +++ b/ydb/core/kqp/ut/olap/write_ut.cpp @@ -14,6 +14,29 @@ namespace NKikimr::NKqp { Y_UNIT_TEST_SUITE(KqpOlapWrite) { + Y_UNIT_TEST(WriteFails) { + auto csController = NKikimr::NYDBTest::TControllers::RegisterCSControllerGuard(); + csController->SetSmallSizeDetector(1000000); + csController->SetIndexWriteControllerEnabled(false); + csController->SetOverridePeriodicWakeupActivationPeriod(TDuration::Seconds(1)); + csController->SetOverrideBlobPutResultOnWriteValue(NKikimrProto::EReplyStatus::BLOCKED); + Singleton()->ResetWriteCounters(); + + auto settings = TKikimrSettings().SetWithSampleTables(false); + TKikimrRunner kikimr(settings); + kikimr.GetTestServer().GetRuntime()->GetAppData().FeatureFlags.SetEnableImmediateWritingOnBulkUpsert(true); + kikimr.GetTestServer().GetRuntime()->GetAppData().FeatureFlags.SetEnableWritePortionsOnInsert(true); + TLocalHelper(kikimr).CreateTestOlapTable(); + Tests::NCommon::TLoggerInit(kikimr) + .SetComponents({ NKikimrServices::TX_COLUMNSHARD }, "CS") + .SetPriority(NActors::NLog::PRI_DEBUG) + .Initialize(); + { + auto batch = TLocalHelper(kikimr).TestArrowBatch(30000, 1000000, 11000); + TLocalHelper(kikimr).SendDataViaActorSystem("/Root/olapStore/olapTable", batch, Ydb::StatusIds::INTERNAL_ERROR); + } + } + Y_UNIT_TEST(TierDraftsGC) { auto csController = NKikimr::NYDBTest::TControllers::RegisterCSControllerGuard(); csController->SetSmallSizeDetector(1000000); diff --git a/ydb/core/protos/counters_columnshard.proto b/ydb/core/protos/counters_columnshard.proto index 8997d5d38d49..327a8c999f5f 100644 --- a/ydb/core/protos/counters_columnshard.proto +++ b/ydb/core/protos/counters_columnshard.proto @@ -203,4 +203,6 @@ enum ETxTypes { TXTYPE_START_INTERNAL_SCAN = 36 [(TxTypeOpts) = {Name: "TxStartInternalScan"}]; TXTYPE_DATA_SHARING_START_SOURCE_CURSOR = 37 [(TxTypeOpts) = {Name: "TxDataSharingStartSourceCursor"}]; TXTYPE_ASK_PORTION_METADATA = 38 [(TxTypeOpts) = {Name: "TxAskPortionMetadata"}]; + TXTYPE_WRITE_PORTIONS_FINISHED = 39 [(TxTypeOpts) = {Name: "TxWritePortionsFinished"}]; + TXTYPE_WRITE_PORTIONS_FAILED = 40 [(TxTypeOpts) = {Name: "TxWritePortionsFailed"}]; } diff --git a/ydb/core/testlib/cs_helper.h b/ydb/core/testlib/cs_helper.h index 1553fbc9859c..832efa1c321d 100644 --- a/ydb/core/testlib/cs_helper.h +++ b/ydb/core/testlib/cs_helper.h @@ -15,7 +15,7 @@ class THelperSchemaless : public NCommon::THelper { void CreateTestOlapStore(TActorId sender, TString scheme); void CreateTestOlapTable(TActorId sender, TString storeOrDirName, TString scheme); void SendDataViaActorSystem(TString testTable, ui64 pathIdBegin, ui64 tsBegin, size_t rowCount, const ui32 tsStepUs = 1) const; - void SendDataViaActorSystem(TString testTable, std::shared_ptr batch, const Ydb::StatusIds_StatusCode& expectedStatus = Ydb::StatusIds::SUCCESS) const; + void SendDataViaActorSystem(TString testTable, std::shared_ptr batch, const Ydb::StatusIds_StatusCode& expectedStatus = Ydb::StatusIds::SUCCESS) const; virtual std::shared_ptr TestArrowBatch(ui64 pathIdBegin, ui64 tsBegin, size_t rowCount, const ui32 tsStepUs = 1) const = 0; }; diff --git a/ydb/core/tx/columnshard/blobs_action/transaction/tx_blobs_written.cpp b/ydb/core/tx/columnshard/blobs_action/transaction/tx_blobs_written.cpp index d12c7df04f81..b365f542df73 100644 --- a/ydb/core/tx/columnshard/blobs_action/transaction/tx_blobs_written.cpp +++ b/ydb/core/tx/columnshard/blobs_action/transaction/tx_blobs_written.cpp @@ -105,27 +105,52 @@ void TTxBlobsWritingFinished::DoComplete(const TActorContext& ctx) { AFL_VERIFY(CommitSnapshot); Self->OperationsManager->AddTemporaryTxLink(op->GetLockId()); Self->OperationsManager->CommitTransactionOnComplete(*Self, op->GetLockId(), *CommitSnapshot); + Self->Counters.GetTabletCounters()->IncCounter(COUNTER_IMMEDIATE_TX_COMPLETED); } Self->Counters.GetCSCounters().OnWriteTxComplete(now - writeMeta.GetWriteStartInstant()); Self->Counters.GetCSCounters().OnSuccessWriteResponse(); } Self->SetupCompaction(pathIds); - Self->Counters.GetTabletCounters()->IncCounter(COUNTER_IMMEDIATE_TX_COMPLETED); } TTxBlobsWritingFinished::TTxBlobsWritingFinished(TColumnShard* self, const NKikimrProto::EReplyStatus writeStatus, const std::shared_ptr& writingActions, std::vector&& packs, - const std::vector& fails) + const std::vector& noDataWrites) : TBase(self, "TTxBlobsWritingFinished") - , PutBlobResult(writeStatus) , Packs(std::move(packs)) , WritingActions(writingActions) { - Y_UNUSED(PutBlobResult); - for (auto&& i : fails) { + for (auto&& i : noDataWrites) { auto ev = NEvents::TDataEvents::TEvWriteResult::BuildCompleted(Self->TabletID()); auto op = Self->GetOperationsManager().GetOperationVerified((TOperationWriteId)i.GetWriteMeta().GetWriteId()); Results.emplace_back(std::move(ev), i.GetWriteMeta().GetSource(), op->GetCookie()); } } +bool TTxBlobsWritingFailed::DoExecute(TTransactionContext& txc, const TActorContext& ctx) { + for (auto&& pack : Packs) { + const auto& writeMeta = pack.GetWriteMeta(); + AFL_VERIFY(!writeMeta.HasLongTxId()); + auto op = Self->GetOperationsManager().GetOperationVerified((TOperationWriteId)writeMeta.GetWriteId()); + Self->OperationsManager->AddTemporaryTxLink(op->GetLockId()); + Self->OperationsManager->AbortTransactionOnExecute(*Self, op->GetLockId(), txc); + + auto ev = NEvents::TDataEvents::TEvWriteResult::BuildError(Self->TabletID(), op->GetLockId(), + NKikimrDataEvents::TEvWriteResult::STATUS_INTERNAL_ERROR, "cannot write blob: " + ::ToString(PutBlobResult)); + Results.emplace_back(std::move(ev), writeMeta.GetSource(), op->GetCookie()); + } + return true; +} + +void TTxBlobsWritingFailed::DoComplete(const TActorContext& ctx) { + for (auto&& i : Results) { + i.DoSendReply(ctx); + Self->Counters.GetCSCounters().OnFailedWriteResponse(EWriteFailReason::PutBlob); + } + for (auto&& pack : Packs) { + const auto& writeMeta = pack.GetWriteMeta(); + auto op = Self->GetOperationsManager().GetOperationVerified((TOperationWriteId)writeMeta.GetWriteId()); + Self->OperationsManager->AbortTransactionOnComplete(*Self, op->GetLockId()); + } +} + } // namespace NKikimr::NColumnShard diff --git a/ydb/core/tx/columnshard/blobs_action/transaction/tx_blobs_written.h b/ydb/core/tx/columnshard/blobs_action/transaction/tx_blobs_written.h index 531b86385933..d758031bd763 100644 --- a/ydb/core/tx/columnshard/blobs_action/transaction/tx_blobs_written.h +++ b/ydb/core/tx/columnshard/blobs_action/transaction/tx_blobs_written.h @@ -15,7 +15,6 @@ class TColumnShard; class TTxBlobsWritingFinished: public NOlap::NDataSharing::TExtendedTransactionBase { private: using TBase = NOlap::NDataSharing::TExtendedTransactionBase; - const NKikimrProto::EReplyStatus PutBlobResult; std::vector Packs; const std::shared_ptr WritingActions; std::optional CommitSnapshot; @@ -43,12 +42,52 @@ class TTxBlobsWritingFinished: public NOlap::NDataSharing::TExtendedTransactionB public: TTxBlobsWritingFinished(TColumnShard* self, const NKikimrProto::EReplyStatus writeStatus, const std::shared_ptr& writingActions, std::vector&& packs, - const std::vector& fails); + const std::vector& noDataWrites); + + virtual bool DoExecute(TTransactionContext& txc, const TActorContext& ctx) override; + virtual void DoComplete(const TActorContext& ctx) override; + TTxType GetTxType() const override { + return TXTYPE_WRITE_PORTIONS_FINISHED; + } +}; + +class TTxBlobsWritingFailed: public NOlap::NDataSharing::TExtendedTransactionBase { +private: + using TBase = NOlap::NDataSharing::TExtendedTransactionBase; + const NKikimrProto::EReplyStatus PutBlobResult; + std::vector Packs; + + class TReplyInfo { + private: + std::unique_ptr Event; + TActorId DestinationForReply; + const ui64 Cookie; + + public: + TReplyInfo(std::unique_ptr&& ev, const TActorId& destinationForReply, const ui64 cookie) + : Event(std::move(ev)) + , DestinationForReply(destinationForReply) + , Cookie(cookie) { + } + + void DoSendReply(const TActorContext& ctx) { + ctx.Send(DestinationForReply, Event.release(), 0, Cookie); + } + }; + + std::vector Results; + +public: + TTxBlobsWritingFailed(TColumnShard* self, const NKikimrProto::EReplyStatus writeStatus, std::vector&& packs) + : TBase(self) + , PutBlobResult(writeStatus) + , Packs(std::move(packs)) { + } virtual bool DoExecute(TTransactionContext& txc, const TActorContext& ctx) override; virtual void DoComplete(const TActorContext& ctx) override; TTxType GetTxType() const override { - return TXTYPE_WRITE; + return TXTYPE_WRITE_PORTIONS_FAILED; } }; diff --git a/ydb/core/tx/columnshard/columnshard__write.cpp b/ydb/core/tx/columnshard/columnshard__write.cpp index ab1dc52fa3b1..6c3c5478e12d 100644 --- a/ydb/core/tx/columnshard/columnshard__write.cpp +++ b/ydb/core/tx/columnshard/columnshard__write.cpp @@ -91,20 +91,34 @@ void TColumnShard::Handle(NPrivateEvents::NWrite::TEvWritePortionResult::TPtr& e TMemoryProfileGuard mpg("TEvWritePortionResult"); NActors::TLogContextGuard gLogging = NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD)("tablet_id", TabletID())("event", "TEvWritePortionResult"); - AFL_VERIFY(ev->Get()->GetWriteStatus() == NKikimrProto::OK); - std::vector writtenPacks = ev->Get()->DetachInsertedPacks(); - std::vector fails = ev->Get()->DetachFails(); - const TMonotonic now = TMonotonic::Now(); - for (auto&& i : writtenPacks) { - Counters.OnWritePutBlobsSuccess(now - i.GetWriteMeta().GetWriteStartInstant(), i.GetRecordsCount()); + std::vector noDataWrites = ev->Get()->DetachNoDataWrites(); + for (auto&& i : noDataWrites) { Counters.GetWritesMonitor()->OnFinishWrite(i.GetDataSize(), 1); } - for (auto&& i : fails) { - Counters.GetWritesMonitor()->OnFinishWrite(i.GetDataSize(), 1); + if (ev->Get()->GetWriteStatus() == NKikimrProto::OK) { + std::vector writtenPacks = ev->Get()->DetachInsertedPacks(); + const TMonotonic now = TMonotonic::Now(); + for (auto&& i : writtenPacks) { + Counters.OnWritePutBlobsSuccess(now - i.GetWriteMeta().GetWriteStartInstant(), i.GetRecordsCount()); + Counters.GetWritesMonitor()->OnFinishWrite(i.GetDataSize(), 1); + } + Execute(new TTxBlobsWritingFinished( + this, ev->Get()->GetWriteStatus(), ev->Get()->GetWriteAction(), std::move(writtenPacks), std::move(noDataWrites)), + ctx); + } else { + if (noDataWrites.size()) { + Execute(new TTxBlobsWritingFinished(this, NKikimrProto::OK, ev->Get()->GetWriteAction(), {}, std::move(noDataWrites)), ctx); + } + + std::vector writtenPacks = ev->Get()->DetachInsertedPacks(); + const TMonotonic now = TMonotonic::Now(); + for (auto&& i : writtenPacks) { + Counters.OnWritePutBlobsFailed(now - i.GetWriteMeta().GetWriteStartInstant(), i.GetRecordsCount()); + Counters.GetCSCounters().OnWritePutBlobsFail(now - i.GetWriteMeta().GetWriteStartInstant()); + Counters.GetWritesMonitor()->OnFinishWrite(i.GetDataSize(), 1); + } + Execute(new TTxBlobsWritingFailed(this, ev->Get()->GetWriteStatus(), std::move(writtenPacks)), ctx); } - Execute( - new TTxBlobsWritingFinished(this, ev->Get()->GetWriteStatus(), ev->Get()->GetWriteAction(), std::move(writtenPacks), std::move(fails)), - ctx); } void TColumnShard::Handle(TEvPrivate::TEvWriteBlobsResult::TPtr& ev, const TActorContext& ctx) { diff --git a/ydb/core/tx/columnshard/columnshard_impl.h b/ydb/core/tx/columnshard/columnshard_impl.h index f725dae4eab8..c39861fdddd9 100644 --- a/ydb/core/tx/columnshard/columnshard_impl.h +++ b/ydb/core/tx/columnshard/columnshard_impl.h @@ -99,6 +99,7 @@ class TTxRemoveSharedBlobs; class TOperationsManager; class TWaitEraseTablesTxSubscriber; class TTxBlobsWritingFinished; +class TTxBlobsWritingFailed; namespace NLoading { class TInsertTableInitializer; @@ -165,6 +166,7 @@ class TColumnShard: public TActor, public NTabletFlatExecutor::TTa friend class TTxPlanStep; friend class TTxWrite; friend class TTxBlobsWritingFinished; + friend class TTxBlobsWritingFailed; friend class TTxReadBase; friend class TTxRead; friend class TTxWriteIndex; diff --git a/ydb/core/tx/columnshard/counters/counters_manager.h b/ydb/core/tx/columnshard/counters/counters_manager.h index 17336ca3410d..9939191104f5 100644 --- a/ydb/core/tx/columnshard/counters/counters_manager.h +++ b/ydb/core/tx/columnshard/counters/counters_manager.h @@ -92,6 +92,11 @@ class TCountersManager { TabletCounters->OnWritePutBlobsSuccess(rowsWritten); CSCounters.OnWritePutBlobsSuccess(d); } + + void OnWritePutBlobsFailed(const TDuration d, const ui64 /*rowsWritten*/) const { + TabletCounters->OnWriteFailure(); + CSCounters.OnWritePutBlobsFail(d); + } }; } // namespace NKikimr::NColumnShard diff --git a/ydb/core/tx/columnshard/hooks/abstract/abstract.h b/ydb/core/tx/columnshard/hooks/abstract/abstract.h index cc3856edface..506020186346 100644 --- a/ydb/core/tx/columnshard/hooks/abstract/abstract.h +++ b/ydb/core/tx/columnshard/hooks/abstract/abstract.h @@ -149,6 +149,10 @@ class ICSController { const std::set& /*snapshotsToSave*/, const std::set& /*snapshotsToRemove*/) { } + virtual NKikimrProto::EReplyStatus OverrideBlobPutResultOnWrite(const NKikimrProto::EReplyStatus originalStatus) const { + return originalStatus; + } + ui64 GetMemoryLimitScanPortion() const { return DoGetMemoryLimitScanPortion(GetConfig().GetMemoryLimitScanPortion()); } diff --git a/ydb/core/tx/columnshard/hooks/testing/controller.h b/ydb/core/tx/columnshard/hooks/testing/controller.h index 7b2028813412..d57470a0a93b 100644 --- a/ydb/core/tx/columnshard/hooks/testing/controller.h +++ b/ydb/core/tx/columnshard/hooks/testing/controller.h @@ -23,6 +23,8 @@ class TController: public TReadOnlyController { YDB_ACCESSOR_DEF(std::optional, OverrideTasksActualizationLag); YDB_ACCESSOR_DEF(std::optional, OverrideReadTimeoutClean); YDB_ACCESSOR(std::optional, OverrideMemoryLimitForPortionReading, 100); + YDB_ACCESSOR_DEF(std::optional, OverrideBlobPutResultOnWriteValue); + EOptimizerCompactionWeightControl CompactionControl = EOptimizerCompactionWeightControl::Force; YDB_ACCESSOR(std::optional, OverrideReduceMemoryIntervalLimit, 1024); @@ -202,6 +204,10 @@ class TController: public TReadOnlyController { } public: + virtual NKikimrProto::EReplyStatus OverrideBlobPutResultOnWrite(const NKikimrProto::EReplyStatus originalStatus) const override { + return OverrideBlobPutResultOnWriteValue.value_or(originalStatus); + } + const TAtomicCounter& GetIndexWriteControllerBrokeCount() const { return IndexWriteControllerBrokeCount; } diff --git a/ydb/core/tx/columnshard/operations/events.h b/ydb/core/tx/columnshard/operations/events.h index affceeb82b3f..b2d5bd9b8e93 100644 --- a/ydb/core/tx/columnshard/operations/events.h +++ b/ydb/core/tx/columnshard/operations/events.h @@ -63,7 +63,7 @@ class TInsertedPortions { } }; -class TFailedWrite { +class TNoDataWrite { private: NEvWrite::TWriteMeta WriteMeta; YDB_READONLY(ui64, DataSize, 0); @@ -73,7 +73,7 @@ class TFailedWrite { return WriteMeta; } - TFailedWrite(const NEvWrite::TWriteMeta& writeMeta, const ui64 dataSize) + TNoDataWrite(const NEvWrite::TWriteMeta& writeMeta, const ui64 dataSize) : WriteMeta(writeMeta) , DataSize(dataSize) { AFL_VERIFY(!WriteMeta.HasLongTxId()); @@ -89,22 +89,22 @@ class TEvWritePortionResult: public TEventLocal, WriteAction); std::vector InsertedPacks; - std::vector Fails; + std::vector NoData; public: std::vector&& DetachInsertedPacks() { return std::move(InsertedPacks); } - std::vector&& DetachFails() { - return std::move(Fails); + std::vector&& DetachNoDataWrites() { + return std::move(NoData); } TEvWritePortionResult(const NKikimrProto::EReplyStatus writeStatus, const std::shared_ptr& writeAction, - std::vector&& portions, std::vector&& fails) + std::vector&& portions, std::vector&& noData) : WriteStatus(writeStatus) , WriteAction(writeAction) , InsertedPacks(portions) - , Fails(fails) { + , NoData(noData) { } }; diff --git a/ydb/core/tx/columnshard/operations/slice_builder/builder.cpp b/ydb/core/tx/columnshard/operations/slice_builder/builder.cpp index bd4666679c57..84def7cbdc04 100644 --- a/ydb/core/tx/columnshard/operations/slice_builder/builder.cpp +++ b/ydb/core/tx/columnshard/operations/slice_builder/builder.cpp @@ -76,14 +76,14 @@ class TPortionWriteController: public NColumnShard::IWriteController, const ui64 DataSize; void DoOnReadyResult(const NActors::TActorContext& ctx, const NColumnShard::TBlobPutResult::TPtr& putResult) override { std::vector portions; - std::vector fails; + std::vector noDataWrites; for (auto&& i : Portions) { portions.emplace_back(i.ExtractPortion(), i.GetPKBatch()); } NColumnShard::TInsertedPortions pack(std::move(WriteMeta), std::move(portions), DataSize); std::vector packs = { pack }; auto result = std::make_unique( - putResult->GetPutStatus(), Action, std::move(packs), std::move(fails)); + putResult->GetPutStatus(), Action, std::move(packs), std::move(noDataWrites)); ctx.Send(DstActor, result.release()); } virtual void DoOnStartSending() override { @@ -119,9 +119,9 @@ TConclusionStatus TBuildSlicesTask::DoExecute(const std::shared_ptr& /*ta if (WriteData.GetWritePortions()) { if (OriginalBatch->num_rows() == 0) { std::vector portions; - std::vector fails = { NColumnShard::TFailedWrite(WriteData.GetWriteMeta(), WriteData.GetSize()) }; + std::vector noDataWrites = { NColumnShard::TNoDataWrite(WriteData.GetWriteMeta(), WriteData.GetSize()) }; auto result = std::make_unique( - NKikimrProto::EReplyStatus::OK, nullptr, std::move(portions), std::move(fails)); + NKikimrProto::EReplyStatus::OK, nullptr, std::move(portions), std::move(noDataWrites)); NActors::TActivationContext::AsActorContext().Send(Context.GetTabletActorId(), result.release()); } else { auto batches = NArrow::NMerger::TRWSortableBatchPosition::SplitByBordersInIntervalPositions(OriginalBatch, diff --git a/ydb/core/tx/columnshard/operations/write.cpp b/ydb/core/tx/columnshard/operations/write.cpp index dd823078dd31..3c54c7d01c09 100644 --- a/ydb/core/tx/columnshard/operations/write.cpp +++ b/ydb/core/tx/columnshard/operations/write.cpp @@ -125,7 +125,7 @@ void TWriteOperation::FromProto(const NKikimrTxColumnShard::TInternalOperationDa } void TWriteOperation::AbortOnExecute(TColumnShard& owner, NTabletFlatExecutor::TTransactionContext& txc) const { - Y_ABORT_UNLESS(Status == EOperationStatus::Prepared); + Y_ABORT_UNLESS(Status != EOperationStatus::Draft); TBlobGroupSelector dsGroupSelector(owner.Info()); NOlap::TDbWrapper dbTable(txc.DB, &dsGroupSelector); @@ -142,7 +142,7 @@ void TWriteOperation::AbortOnExecute(TColumnShard& owner, NTabletFlatExecutor::T } void TWriteOperation::AbortOnComplete(TColumnShard& owner) const { - Y_ABORT_UNLESS(Status == EOperationStatus::Prepared); + Y_ABORT_UNLESS(Status != EOperationStatus::Draft); if (WritePortions) { for (auto&& i : InsertWriteIds) { owner.MutableIndexAs().MutableGranuleVerified(PathId).AbortPortionOnComplete( diff --git a/ydb/core/tx/columnshard/write_actor.cpp b/ydb/core/tx/columnshard/write_actor.cpp index 25cbb99b5915..7478eb1120df 100644 --- a/ydb/core/tx/columnshard/write_actor.cpp +++ b/ydb/core/tx/columnshard/write_actor.cpp @@ -34,6 +34,8 @@ class TWriteActor: public TActorBootstrapped, public TMonitoringObj YellowStopChannels.insert(msg->Id.Channel()); } + status = NYDBTest::TControllers::GetColumnShardController()->OverrideBlobPutResultOnWrite(status); + if (status != NKikimrProto::OK) { ACFL_ERROR("event", "TEvPutResult")("blob_id", msg->Id.ToString())("status", status)("error", msg->ErrorReason); WriteController->Abort("cannot write blob " + msg->Id.ToString() + ", status: " + ::ToString(status) + ". reason: " + msg->ErrorReason);