Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix write ids usage #8909

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion .github/config/muted_ya.txt
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ ydb/core/keyvalue/ut_trace TKeyValueTracingTest.*
ydb/core/kqp/provider/ut KikimrIcGateway.TestLoadBasicSecretValueFromExternalDataSourceMetadata
ydb/core/kqp/ut/olap KqpOlapBlobsSharing.*
ydb/core/kqp/ut/olap KqpOlapStatistics.StatsUsageWithTTL
ydb/core/kqp/ut/olap KqpOlap.TableSinkWithOlapStore
ydb/core/kqp/ut/pg KqpPg.CreateIndex
ydb/core/kqp/ut/query KqpLimits.QueryReplySize
ydb/core/kqp/ut/query KqpQuery.QueryTimeout
Expand Down
3 changes: 1 addition & 2 deletions ydb/core/kqp/ut/olap/kqp_olap_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2897,13 +2897,12 @@ Y_UNIT_TEST_SUITE(KqpOlap) {

WriteTestData(kikimr, "/Root/olapStore/olapTable0", 0, 1000000, 3, true);


auto client = kikimr.GetQueryClient();
{
auto result = client.ExecuteQuery(R"(
SELECT * FROM `/Root/olapStore/olapTable0` ORDER BY timestamp;
INSERT INTO `/Root/olapStore/olapTable1` SELECT * FROM `/Root/olapStore/olapTable0`;
INSERT INTO `/Root/olapStore/olapTable0` SELECT * FROM `/Root/olapStore/olapTable1`;
REPLACE INTO `/Root/olapStore/olapTable0` SELECT * FROM `/Root/olapStore/olapTable1`;
SELECT * FROM `/Root/olapStore/olapTable1` ORDER BY timestamp;
)", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync();
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@
namespace NKikimr::NColumnShard {
class TTxInsertTableCleanup: public TTransactionBase<TColumnShard> {
private:
THashSet<TWriteId> WriteIdsToAbort;
THashSet<TInsertWriteId> WriteIdsToAbort;
std::shared_ptr<NOlap::IBlobsDeclareRemovingAction> BlobsAction;
public:
TTxInsertTableCleanup(TColumnShard* self, THashSet<TWriteId>&& writeIdsToAbort)
TTxInsertTableCleanup(TColumnShard* self, THashSet<TInsertWriteId>&& writeIdsToAbort)
: TBase(self)
, WriteIdsToAbort(std::move(writeIdsToAbort)) {
Y_ABORT_UNLESS(WriteIdsToAbort.size() || self->InsertTable->GetAborted().size());
Expand Down
64 changes: 31 additions & 33 deletions ydb/core/tx/columnshard/blobs_action/transaction/tx_write.cpp
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
#include "tx_write.h"

#include <ydb/core/tx/columnshard/engines/insert_table/user_data.h>
#include <ydb/core/tx/columnshard/transactions/locks/write.h>

namespace NKikimr::NColumnShard {

bool TTxWrite::InsertOneBlob(TTransactionContext& txc, const NOlap::TWideSerializedBatch& batch, const TWriteId writeId) {
bool TTxWrite::InsertOneBlob(TTransactionContext& txc, const NOlap::TWideSerializedBatch& batch, const TInsertWriteId writeId) {
NKikimrTxColumnShard::TLogicalMetadata meta;
meta.SetNumRows(batch->GetRowsCount());
meta.SetRawBytes(batch->GetRawBytes());
Expand All @@ -23,9 +25,8 @@ bool TTxWrite::InsertOneBlob(TTransactionContext& txc, const NOlap::TWideSeriali
auto schemeVersion = batch.GetAggregation().GetSchemaVersion();
auto tableSchema = Self->TablesManager.GetPrimaryIndex()->GetVersionedIndex().GetSchemaVerified(schemeVersion);

NOlap::TInsertedData insertData((ui64)writeId, writeMeta.GetTableId(), writeMeta.GetDedupId(), blobRange,
meta, tableSchema->GetVersion(),
batch->GetData());
auto userData = std::make_shared<NOlap::TUserData>(writeMeta.GetTableId(), blobRange, meta, tableSchema->GetVersion(), batch->GetData());
NOlap::TInsertedData insertData(writeId, userData);
bool ok = Self->InsertTable->Insert(dbTable, std::move(insertData));
if (ok) {
Self->UpdateInsertTableCounters();
Expand All @@ -36,7 +37,8 @@ bool TTxWrite::InsertOneBlob(TTransactionContext& txc, const NOlap::TWideSeriali

bool TTxWrite::Execute(TTransactionContext& txc, const TActorContext&) {
TMemoryProfileGuard mpg("TTxWrite::Execute");
NActors::TLogContextGuard logGuard = NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD_BLOBS)("tablet_id", Self->TabletID())("tx_state", "execute");
NActors::TLogContextGuard logGuard =
NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD_BLOBS)("tablet_id", Self->TabletID())("tx_state", "execute");
ACFL_DEBUG("event", "start_execute");
const NOlap::TWritingBuffer& buffer = PutBlobResult->Get()->MutableWritesBuffer();
for (auto&& aggr : buffer.GetAggregations()) {
Expand All @@ -45,33 +47,27 @@ bool TTxWrite::Execute(TTransactionContext& txc, const TActorContext&) {
txc.DB.NoMoreReadsForTx();
TWriteOperation::TPtr operation;
if (writeMeta.HasLongTxId()) {
NIceDb::TNiceDb db(txc.DB);
const TInsertWriteId insertWriteId =
Self->GetLongTxWrite(db, writeMeta.GetLongTxIdUnsafe(), writeMeta.GetWritePartId(), writeMeta.GetGranuleShardingVersion());
aggr->AddInsertWriteId(insertWriteId);
if (writeMeta.IsGuaranteeWriter()) {
AFL_VERIFY(aggr->GetSplittedBlobs().size() == 1)("count", aggr->GetSplittedBlobs().size());
} else {
AFL_VERIFY(aggr->GetSplittedBlobs().size() <= 1)("count", aggr->GetSplittedBlobs().size());
}
if (aggr->GetSplittedBlobs().size() == 1) {
AFL_VERIFY(InsertOneBlob(txc, aggr->GetSplittedBlobs().front(), insertWriteId))("write_id", writeMeta.GetWriteId())(
"insert_write_id", insertWriteId);
}
} else {
operation = Self->OperationsManager->GetOperation((TWriteId)writeMeta.GetWriteId());
Y_ABORT_UNLESS(operation);
operation = Self->OperationsManager->GetOperationVerified((TOperationWriteId)writeMeta.GetWriteId());
Y_ABORT_UNLESS(operation->GetStatus() == EOperationStatus::Started);
}

auto writeId = TWriteId(writeMeta.GetWriteId());
if (!operation) {
NIceDb::TNiceDb db(txc.DB);
writeId = Self->GetLongTxWrite(db, writeMeta.GetLongTxIdUnsafe(), writeMeta.GetWritePartId(), writeMeta.GetGranuleShardingVersion());
aggr->AddWriteId(writeId);
}

for (auto&& i : aggr->GetSplittedBlobs()) {
if (operation) {
writeId = Self->BuildNextWriteId(txc);
aggr->AddWriteId(writeId);
}

if (!InsertOneBlob(txc, i, writeId)) {
LOG_S_DEBUG(TxPrefix() << "duplicate writeId " << (ui64)writeId << TxSuffix());
Self->Counters.GetTabletCounters()->IncCounter(COUNTER_WRITE_DUPLICATE);
for (auto&& i : aggr->GetSplittedBlobs()) {
const TInsertWriteId insertWriteId = Self->InsertTable->BuildNextWriteId(txc);
aggr->AddInsertWriteId(insertWriteId);
AFL_VERIFY(InsertOneBlob(txc, i, insertWriteId))("write_id", writeMeta.GetWriteId())("insert_write_id", insertWriteId)(
"size", aggr->GetSplittedBlobs().size());
}
}
}
Expand All @@ -88,9 +84,10 @@ bool TTxWrite::Execute(TTransactionContext& txc, const TActorContext&) {
for (auto&& aggr : buffer.GetAggregations()) {
const auto& writeMeta = aggr->GetWriteMeta();
if (!writeMeta.HasLongTxId()) {
auto operation = Self->OperationsManager->GetOperationVerified((TWriteId)writeMeta.GetWriteId());
auto operation = Self->OperationsManager->GetOperationVerified((TOperationWriteId)writeMeta.GetWriteId());
Y_ABORT_UNLESS(operation->GetStatus() == EOperationStatus::Started);
operation->OnWriteFinish(txc, aggr->GetWriteIds(), operation->GetBehaviour() == EOperationBehaviour::NoTxWrite);
operation->OnWriteFinish(txc, aggr->GetInsertWriteIds(), operation->GetBehaviour() == EOperationBehaviour::NoTxWrite);
Self->OperationsManager->LinkInsertWriteIdToOperationWriteId(aggr->GetInsertWriteIds(), operation->GetWriteId());
if (operation->GetBehaviour() == EOperationBehaviour::NoTxWrite) {
auto ev = NEvents::TDataEvents::TEvWriteResult::BuildCompleted(Self->TabletID());
Results.emplace_back(std::move(ev), writeMeta.GetSource(), operation->GetCookie());
Expand Down Expand Up @@ -119,8 +116,9 @@ bool TTxWrite::Execute(TTransactionContext& txc, const TActorContext&) {
Results.emplace_back(std::move(ev), writeMeta.GetSource(), operation->GetCookie());
}
} else {
Y_ABORT_UNLESS(aggr->GetWriteIds().size() == 1);
auto ev = std::make_unique<TEvColumnShard::TEvWriteResult>(Self->TabletID(), writeMeta, (ui64)aggr->GetWriteIds().front(), NKikimrTxColumnShard::EResultStatus::SUCCESS);
Y_ABORT_UNLESS(aggr->GetInsertWriteIds().size() == 1);
auto ev = std::make_unique<TEvColumnShard::TEvWriteResult>(
Self->TabletID(), writeMeta, (ui64)aggr->GetInsertWriteIds().front(), NKikimrTxColumnShard::EResultStatus::SUCCESS);
Results.emplace_back(std::move(ev), writeMeta.GetSource(), 0);
}
}
Expand All @@ -129,7 +127,8 @@ bool TTxWrite::Execute(TTransactionContext& txc, const TActorContext&) {

void TTxWrite::Complete(const TActorContext& ctx) {
TMemoryProfileGuard mpg("TTxWrite::Complete");
NActors::TLogContextGuard logGuard = NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD_BLOBS)("tablet_id", Self->TabletID())("tx_state", "complete");
NActors::TLogContextGuard logGuard =
NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD_BLOBS)("tablet_id", Self->TabletID())("tx_state", "complete");
const auto now = TMonotonic::Now();
const NOlap::TWritingBuffer& buffer = PutBlobResult->Get()->MutableWritesBuffer();
for (auto&& i : buffer.GetAddActions()) {
Expand All @@ -149,7 +148,7 @@ void TTxWrite::Complete(const TActorContext& ctx) {
for (ui32 i = 0; i < buffer.GetAggregations().size(); ++i) {
const auto& writeMeta = buffer.GetAggregations()[i]->GetWriteMeta();
if (!writeMeta.HasLongTxId()) {
auto op = Self->GetOperationsManager().GetOperationVerified(NOlap::TWriteId(writeMeta.GetWriteId()));
auto op = Self->GetOperationsManager().GetOperationVerified((TOperationWriteId)writeMeta.GetWriteId());
if (op->GetBehaviour() == EOperationBehaviour::WriteWithLock || op->GetBehaviour() == EOperationBehaviour::NoTxWrite) {
auto evWrite = std::make_shared<NOlap::NTxInteractions::TEvWriteWriter>(writeMeta.GetTableId(),
buffer.GetAggregations()[i]->GetRecordBatch(), Self->GetIndexOptional()->GetVersionedIndex().GetPrimaryKey());
Expand All @@ -158,12 +157,11 @@ void TTxWrite::Complete(const TActorContext& ctx) {
if (op->GetBehaviour() == EOperationBehaviour::NoTxWrite) {
Self->OperationsManager->CommitTransactionOnComplete(*Self, op->GetLockId(), Self->GetLastTxSnapshot());
}

}
Self->Counters.GetCSCounters().OnWriteTxComplete(now - writeMeta.GetWriteStartInstant());
Self->Counters.GetCSCounters().OnSuccessWriteResponse();
}
Self->Counters.GetTabletCounters()->IncCounter(COUNTER_IMMEDIATE_TX_COMPLETED);
}

}
} // namespace NKikimr::NColumnShard
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ class TTxWrite : public NTabletFlatExecutor::TTransactionBase<TColumnShard> {
std::vector<std::shared_ptr<TTxController::ITransactionOperator>> ResultOperators;


bool InsertOneBlob(TTransactionContext& txc, const NOlap::TWideSerializedBatch& batch, const TWriteId writeId);
bool InsertOneBlob(TTransactionContext& txc, const NOlap::TWideSerializedBatch& batch, const TInsertWriteId writeId);

TStringBuilder TxPrefix() const {
return TStringBuilder() << "TxWrite[" << ToString(TabletTxNo) << "] ";
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/tx/columnshard/columnshard.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ void TColumnShard::UpdateIndexCounters() {

ui64 TColumnShard::MemoryUsage() const {
ui64 memory = ProgressTxController->GetMemoryUsage() + ScanTxInFlight.size() * (sizeof(ui64) + sizeof(TInstant)) +
LongTxWrites.size() * (sizeof(TWriteId) + sizeof(TLongTxWriteInfo)) +
LongTxWrites.size() * (sizeof(TInsertWriteId) + sizeof(TLongTxWriteInfo)) +
LongTxWritesByUniqueId.size() * (sizeof(TULID) + sizeof(void*)) +
(WaitingScans.size()) * (sizeof(NOlap::TSnapshot) + sizeof(void*)) +
Counters.GetTabletCounters()->GetValue(COUNTER_PREPARED_RECORDS) * sizeof(NOlap::TInsertedData) +
Expand Down
6 changes: 2 additions & 4 deletions ydb/core/tx/columnshard/columnshard__init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ void TTxInit::SetDefaults() {
Self->CurrentSchemeShardId = 0;
Self->LastSchemaSeqNo = { };
Self->ProcessingParams.reset();
Self->LastWriteId = TWriteId{0};
Self->LastPlannedStep = 0;
Self->LastPlannedTxId = 0;
Self->LastCompletedTx = NOlap::TSnapshot::Zero();
Expand Down Expand Up @@ -73,7 +72,6 @@ bool TTxInit::Precharge(TTransactionContext& txc) {
ready = ready && Schema::GetSpecialValueOpt(db, Schema::EValueIds::LastSchemaSeqNoGeneration, Self->LastSchemaSeqNo.Generation);
ready = ready && Schema::GetSpecialValueOpt(db, Schema::EValueIds::LastSchemaSeqNoRound, Self->LastSchemaSeqNo.Round);
ready = ready && Schema::GetSpecialProtoValue(db, Schema::EValueIds::ProcessingParams, Self->ProcessingParams);
ready = ready && Schema::GetSpecialValueOpt(db, Schema::EValueIds::LastWriteId, Self->LastWriteId);
ready = ready && Schema::GetSpecialValueOpt(db, Schema::EValueIds::LastPlannedStep, Self->LastPlannedStep);
ready = ready && Schema::GetSpecialValueOpt(db, Schema::EValueIds::LastPlannedTxId, Self->LastPlannedTxId);
ready = ready && Schema::GetSpecialValueOpt(db, Schema::EValueIds::LastExportNumber, Self->LastExportNo);
Expand Down Expand Up @@ -107,7 +105,7 @@ bool TTxInit::ReadEverything(TTransactionContext& txc, const TActorContext& ctx)
ACFL_DEBUG("step", "TInsertTable::Load_Start");
TMemoryProfileGuard g("TTxInit/InsertTable");
auto localInsertTable = std::make_unique<NOlap::TInsertTable>();
if (!localInsertTable->Load(dbTable, TAppData::TimeProvider->Now())) {
if (!localInsertTable->Load(db, dbTable, TAppData::TimeProvider->Now())) {
ACFL_ERROR("step", "TInsertTable::Load_Fails");
return false;
}
Expand Down Expand Up @@ -182,7 +180,7 @@ bool TTxInit::ReadEverything(TTransactionContext& txc, const TActorContext& ctx)
}

while (!rowset.EndOfSet()) {
const TWriteId writeId = TWriteId{ rowset.GetValue<Schema::LongTxWrites::WriteId>() };
const TInsertWriteId writeId = (TInsertWriteId)rowset.GetValue<Schema::LongTxWrites::WriteId>();
const ui32 writePartId = rowset.GetValue<Schema::LongTxWrites::WritePartId>();
NKikimrLongTxService::TLongTxId proto;
Y_ABORT_UNLESS(proto.ParseFromString(rowset.GetValue<Schema::LongTxWrites::LongTxId>()));
Expand Down
24 changes: 13 additions & 11 deletions ydb/core/tx/columnshard/columnshard__write.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ void TColumnShard::Handle(TEvPrivate::TEvWriteBlobsResult::TPtr& ev, const TActo
auto result = std::make_unique<TEvColumnShard::TEvWriteResult>(TabletID(), writeMeta, errCode);
ctx.Send(writeMeta.GetSource(), result.release());
} else {
auto operation = OperationsManager->GetOperation((TWriteId)writeMeta.GetWriteId());
auto operation = OperationsManager->GetOperation((TOperationWriteId)writeMeta.GetWriteId());
Y_ABORT_UNLESS(operation);
auto result = NEvents::TDataEvents::TEvWriteResult::BuildError(TabletID(), operation->GetLockId(),
ev->Get()->GetWriteResultStatus(), ev->Get()->GetErrorMessage() ? ev->Get()->GetErrorMessage() : "put data fails");
Expand Down Expand Up @@ -289,7 +289,7 @@ class TCommitOperation {
}

TConclusionStatus Parse(const NEvents::TDataEvents::TEvWrite& evWrite) {
AFL_VERIFY(evWrite.Record.GetLocks().GetLocks().size() == 1);
AFL_VERIFY(evWrite.Record.GetLocks().GetLocks().size() >= 1);
auto& locks = evWrite.Record.GetLocks();
auto& lock = evWrite.Record.GetLocks().GetLocks()[0];
SendingShards = std::set<ui64>(locks.GetSendingShards().begin(), locks.GetSendingShards().end());
Expand Down Expand Up @@ -324,7 +324,8 @@ class TCommitOperation {
return TConclusionStatus::Success();
}

std::unique_ptr<NColumnShard::TEvWriteCommitSyncTransactionOperator> CreateTxOperator(const NKikimrTxColumnShard::ETransactionKind kind) const {
std::unique_ptr<NColumnShard::TEvWriteCommitSyncTransactionOperator> CreateTxOperator(
const NKikimrTxColumnShard::ETransactionKind kind) const {
AFL_VERIFY(ReceivingShards.size());
if (IsPrimary()) {
return std::make_unique<NColumnShard::TEvWriteCommitPrimaryTransactionOperator>(
Expand Down Expand Up @@ -428,15 +429,16 @@ void TColumnShard::Handle(NEvents::TDataEvents::TEvWrite::TPtr& ev, const TActor
const auto& record = ev->Get()->Record;
const auto source = ev->Sender;
const auto cookie = ev->Cookie;
const auto behaviour = TOperationsManager::GetBehaviour(*ev->Get());
// AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("ev_write", record.DebugString());
if (behaviour == EOperationBehaviour::Undefined) {
const auto behaviourConclusion = TOperationsManager::GetBehaviour(*ev->Get());
// AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("ev_write", record.DebugString());
if (behaviourConclusion.IsFail()) {
Counters.GetTabletCounters()->IncCounter(COUNTER_WRITE_FAIL);
auto result = NEvents::TDataEvents::TEvWriteResult::BuildError(
TabletID(), 0, NKikimrDataEvents::TEvWriteResult::STATUS_BAD_REQUEST, "invalid write event");
auto result = NEvents::TDataEvents::TEvWriteResult::BuildError(TabletID(), 0, NKikimrDataEvents::TEvWriteResult::STATUS_BAD_REQUEST,
"invalid write event: " + behaviourConclusion.GetErrorMessage());
ctx.Send(source, result.release(), 0, cookie);
return;
}
auto behaviour = *behaviourConclusion;

if (behaviour == EOperationBehaviour::AbortWriteLock) {
Execute(new TAbortWriteTransaction(this, record.GetLocks().GetLocks()[0].GetLockId(), source, cookie), ctx);
Expand All @@ -447,8 +449,7 @@ void TColumnShard::Handle(NEvents::TDataEvents::TEvWrite::TPtr& ev, const TActor
auto commitOperation = std::make_shared<TCommitOperation>(TabletID());
const auto sendError = [&](const TString& message, const NKikimrDataEvents::TEvWriteResult::EStatus status) {
Counters.GetTabletCounters()->IncCounter(COUNTER_WRITE_FAIL);
auto result =
NEvents::TDataEvents::TEvWriteResult::BuildError(TabletID(), 0, status, message);
auto result = NEvents::TDataEvents::TEvWriteResult::BuildError(TabletID(), 0, status, message);
ctx.Send(source, result.release(), 0, cookie);
};
auto conclusionParse = commitOperation->Parse(*ev->Get());
Expand All @@ -466,7 +467,8 @@ void TColumnShard::Handle(NEvents::TDataEvents::TEvWrite::TPtr& ev, const TActor
" != " + ::ToString(commitOperation->GetGeneration()),
NKikimrDataEvents::TEvWriteResult::STATUS_LOCKS_BROKEN);
} else if (lockInfo->GetInternalGenerationCounter() != commitOperation->GetInternalGenerationCounter()) {
sendError("tablet lock have another internal generation counter: " + ::ToString(lockInfo->GetInternalGenerationCounter()) +
sendError(
"tablet lock have another internal generation counter: " + ::ToString(lockInfo->GetInternalGenerationCounter()) +
" != " + ::ToString(commitOperation->GetInternalGenerationCounter()),
NKikimrDataEvents::TEvWriteResult::STATUS_LOCKS_BROKEN);
} else {
Expand Down
18 changes: 0 additions & 18 deletions ydb/core/tx/columnshard/columnshard_common.cpp

This file was deleted.

Loading
Loading