Skip to content

Commit

Permalink
Merge b6a02fa into c0d60de
Browse files Browse the repository at this point in the history
  • Loading branch information
ivanmorozov333 authored Sep 8, 2024
2 parents c0d60de + b6a02fa commit 71659be
Show file tree
Hide file tree
Showing 50 changed files with 727 additions and 712 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@
namespace NKikimr::NColumnShard {
class TTxInsertTableCleanup: public TTransactionBase<TColumnShard> {
private:
THashSet<TWriteId> WriteIdsToAbort;
THashSet<TInsertWriteId> WriteIdsToAbort;
std::shared_ptr<NOlap::IBlobsDeclareRemovingAction> BlobsAction;
public:
TTxInsertTableCleanup(TColumnShard* self, THashSet<TWriteId>&& writeIdsToAbort)
TTxInsertTableCleanup(TColumnShard* self, THashSet<TInsertWriteId>&& writeIdsToAbort)
: TBase(self)
, WriteIdsToAbort(std::move(writeIdsToAbort)) {
Y_ABORT_UNLESS(WriteIdsToAbort.size() || self->InsertTable->GetAborted().size());
Expand Down
63 changes: 30 additions & 33 deletions ydb/core/tx/columnshard/blobs_action/transaction/tx_write.cpp
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
#include "tx_write.h"

#include <ydb/core/tx/columnshard/engines/insert_table/user_data.h>
#include <ydb/core/tx/columnshard/transactions/locks/write.h>

namespace NKikimr::NColumnShard {

bool TTxWrite::InsertOneBlob(TTransactionContext& txc, const NOlap::TWideSerializedBatch& batch, const TWriteId writeId) {
bool TTxWrite::InsertOneBlob(TTransactionContext& txc, const NOlap::TWideSerializedBatch& batch, const TInsertWriteId writeId) {
NKikimrTxColumnShard::TLogicalMetadata meta;
meta.SetNumRows(batch->GetRowsCount());
meta.SetRawBytes(batch->GetRawBytes());
Expand All @@ -23,9 +25,8 @@ bool TTxWrite::InsertOneBlob(TTransactionContext& txc, const NOlap::TWideSeriali
auto schemeVersion = batch.GetAggregation().GetSchemaVersion();
auto tableSchema = Self->TablesManager.GetPrimaryIndex()->GetVersionedIndex().GetSchemaVerified(schemeVersion);

NOlap::TInsertedData insertData((ui64)writeId, writeMeta.GetTableId(), writeMeta.GetDedupId(), blobRange,
meta, tableSchema->GetVersion(),
batch->GetData());
auto userData = std::make_shared<NOlap::TUserData>(writeMeta.GetTableId(), blobRange, meta, tableSchema->GetVersion(), batch->GetData());
NOlap::TInsertedData insertData(writeId, userData);
bool ok = Self->InsertTable->Insert(dbTable, std::move(insertData));
if (ok) {
Self->UpdateInsertTableCounters();
Expand All @@ -36,7 +37,8 @@ bool TTxWrite::InsertOneBlob(TTransactionContext& txc, const NOlap::TWideSeriali

bool TTxWrite::Execute(TTransactionContext& txc, const TActorContext&) {
TMemoryProfileGuard mpg("TTxWrite::Execute");
NActors::TLogContextGuard logGuard = NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD_BLOBS)("tablet_id", Self->TabletID())("tx_state", "execute");
NActors::TLogContextGuard logGuard =
NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD_BLOBS)("tablet_id", Self->TabletID())("tx_state", "execute");
ACFL_DEBUG("event", "start_execute");
const NOlap::TWritingBuffer& buffer = PutBlobResult->Get()->MutableWritesBuffer();
for (auto&& aggr : buffer.GetAggregations()) {
Expand All @@ -45,33 +47,27 @@ bool TTxWrite::Execute(TTransactionContext& txc, const TActorContext&) {
txc.DB.NoMoreReadsForTx();
TWriteOperation::TPtr operation;
if (writeMeta.HasLongTxId()) {
NIceDb::TNiceDb db(txc.DB);
const TInsertWriteId insertWriteId =
Self->GetLongTxWrite(db, writeMeta.GetLongTxIdUnsafe(), writeMeta.GetWritePartId(), writeMeta.GetGranuleShardingVersion());
aggr->AddInsertWriteId(insertWriteId);
if (writeMeta.IsGuaranteeWriter()) {
AFL_VERIFY(aggr->GetSplittedBlobs().size() == 1)("count", aggr->GetSplittedBlobs().size());
} else {
AFL_VERIFY(aggr->GetSplittedBlobs().size() <= 1)("count", aggr->GetSplittedBlobs().size());
}
if (aggr->GetSplittedBlobs().size() == 1) {
AFL_VERIFY(InsertOneBlob(txc, aggr->GetSplittedBlobs().front(), insertWriteId))("write_id", writeMeta.GetWriteId())(
"insert_write_id", insertWriteId);
}
} else {
operation = Self->OperationsManager->GetOperation((TWriteId)writeMeta.GetWriteId());
Y_ABORT_UNLESS(operation);
operation = Self->OperationsManager->GetOperationVerified((TOperationWriteId)writeMeta.GetWriteId());
Y_ABORT_UNLESS(operation->GetStatus() == EOperationStatus::Started);
}

auto writeId = TWriteId(writeMeta.GetWriteId());
if (!operation) {
NIceDb::TNiceDb db(txc.DB);
writeId = Self->GetLongTxWrite(db, writeMeta.GetLongTxIdUnsafe(), writeMeta.GetWritePartId(), writeMeta.GetGranuleShardingVersion());
aggr->AddWriteId(writeId);
}

for (auto&& i : aggr->GetSplittedBlobs()) {
if (operation) {
writeId = Self->BuildNextWriteId(txc);
aggr->AddWriteId(writeId);
}

if (!InsertOneBlob(txc, i, writeId)) {
LOG_S_DEBUG(TxPrefix() << "duplicate writeId " << (ui64)writeId << TxSuffix());
Self->Counters.GetTabletCounters()->IncCounter(COUNTER_WRITE_DUPLICATE);
for (auto&& i : aggr->GetSplittedBlobs()) {
const TInsertWriteId insertWriteId = Self->InsertTable->BuildNextWriteId(txc);
aggr->AddInsertWriteId(insertWriteId);
AFL_VERIFY(InsertOneBlob(txc, i, insertWriteId))("write_id", writeMeta.GetWriteId())("insert_write_id", insertWriteId)(
"size", aggr->GetSplittedBlobs().size());
}
}
}
Expand All @@ -88,9 +84,9 @@ bool TTxWrite::Execute(TTransactionContext& txc, const TActorContext&) {
for (auto&& aggr : buffer.GetAggregations()) {
const auto& writeMeta = aggr->GetWriteMeta();
if (!writeMeta.HasLongTxId()) {
auto operation = Self->OperationsManager->GetOperationVerified((TWriteId)writeMeta.GetWriteId());
auto operation = Self->OperationsManager->GetOperationVerified((TOperationWriteId)writeMeta.GetWriteId());
Y_ABORT_UNLESS(operation->GetStatus() == EOperationStatus::Started);
operation->OnWriteFinish(txc, aggr->GetWriteIds(), operation->GetBehaviour() == EOperationBehaviour::NoTxWrite);
operation->OnWriteFinish(txc, aggr->GetInsertWriteIds(), operation->GetBehaviour() == EOperationBehaviour::NoTxWrite);
if (operation->GetBehaviour() == EOperationBehaviour::NoTxWrite) {
auto ev = NEvents::TDataEvents::TEvWriteResult::BuildCompleted(Self->TabletID());
Results.emplace_back(std::move(ev), writeMeta.GetSource(), operation->GetCookie());
Expand Down Expand Up @@ -119,8 +115,9 @@ bool TTxWrite::Execute(TTransactionContext& txc, const TActorContext&) {
Results.emplace_back(std::move(ev), writeMeta.GetSource(), operation->GetCookie());
}
} else {
Y_ABORT_UNLESS(aggr->GetWriteIds().size() == 1);
auto ev = std::make_unique<TEvColumnShard::TEvWriteResult>(Self->TabletID(), writeMeta, (ui64)aggr->GetWriteIds().front(), NKikimrTxColumnShard::EResultStatus::SUCCESS);
Y_ABORT_UNLESS(aggr->GetInsertWriteIds().size() == 1);
auto ev = std::make_unique<TEvColumnShard::TEvWriteResult>(
Self->TabletID(), writeMeta, (ui64)aggr->GetInsertWriteIds().front(), NKikimrTxColumnShard::EResultStatus::SUCCESS);
Results.emplace_back(std::move(ev), writeMeta.GetSource(), 0);
}
}
Expand All @@ -129,7 +126,8 @@ bool TTxWrite::Execute(TTransactionContext& txc, const TActorContext&) {

void TTxWrite::Complete(const TActorContext& ctx) {
TMemoryProfileGuard mpg("TTxWrite::Complete");
NActors::TLogContextGuard logGuard = NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD_BLOBS)("tablet_id", Self->TabletID())("tx_state", "complete");
NActors::TLogContextGuard logGuard =
NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD_BLOBS)("tablet_id", Self->TabletID())("tx_state", "complete");
const auto now = TMonotonic::Now();
const NOlap::TWritingBuffer& buffer = PutBlobResult->Get()->MutableWritesBuffer();
for (auto&& i : buffer.GetAddActions()) {
Expand All @@ -149,7 +147,7 @@ void TTxWrite::Complete(const TActorContext& ctx) {
for (ui32 i = 0; i < buffer.GetAggregations().size(); ++i) {
const auto& writeMeta = buffer.GetAggregations()[i]->GetWriteMeta();
if (!writeMeta.HasLongTxId()) {
auto op = Self->GetOperationsManager().GetOperationVerified(NOlap::TWriteId(writeMeta.GetWriteId()));
auto op = Self->GetOperationsManager().GetOperationVerified((TOperationWriteId)writeMeta.GetWriteId());
if (op->GetBehaviour() == EOperationBehaviour::WriteWithLock || op->GetBehaviour() == EOperationBehaviour::NoTxWrite) {
auto evWrite = std::make_shared<NOlap::NTxInteractions::TEvWriteWriter>(writeMeta.GetTableId(),
buffer.GetAggregations()[i]->GetRecordBatch(), Self->GetIndexOptional()->GetVersionedIndex().GetPrimaryKey());
Expand All @@ -158,12 +156,11 @@ void TTxWrite::Complete(const TActorContext& ctx) {
if (op->GetBehaviour() == EOperationBehaviour::NoTxWrite) {
Self->OperationsManager->CommitTransactionOnComplete(*Self, op->GetLockId(), Self->GetLastTxSnapshot());
}

}
Self->Counters.GetCSCounters().OnWriteTxComplete(now - writeMeta.GetWriteStartInstant());
Self->Counters.GetCSCounters().OnSuccessWriteResponse();
}
Self->Counters.GetTabletCounters()->IncCounter(COUNTER_IMMEDIATE_TX_COMPLETED);
}

}
} // namespace NKikimr::NColumnShard
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ class TTxWrite : public NTabletFlatExecutor::TTransactionBase<TColumnShard> {
std::vector<std::shared_ptr<TTxController::ITransactionOperator>> ResultOperators;


bool InsertOneBlob(TTransactionContext& txc, const NOlap::TWideSerializedBatch& batch, const TWriteId writeId);
bool InsertOneBlob(TTransactionContext& txc, const NOlap::TWideSerializedBatch& batch, const TInsertWriteId writeId);

TStringBuilder TxPrefix() const {
return TStringBuilder() << "TxWrite[" << ToString(TabletTxNo) << "] ";
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/tx/columnshard/columnshard.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ void TColumnShard::UpdateIndexCounters() {

ui64 TColumnShard::MemoryUsage() const {
ui64 memory = ProgressTxController->GetMemoryUsage() + ScanTxInFlight.size() * (sizeof(ui64) + sizeof(TInstant)) +
LongTxWrites.size() * (sizeof(TWriteId) + sizeof(TLongTxWriteInfo)) +
LongTxWrites.size() * (sizeof(TInsertWriteId) + sizeof(TLongTxWriteInfo)) +
LongTxWritesByUniqueId.size() * (sizeof(TULID) + sizeof(void*)) +
(WaitingScans.size()) * (sizeof(NOlap::TSnapshot) + sizeof(void*)) +
Counters.GetTabletCounters()->GetValue(COUNTER_PREPARED_RECORDS) * sizeof(NOlap::TInsertedData) +
Expand Down
6 changes: 2 additions & 4 deletions ydb/core/tx/columnshard/columnshard__init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ void TTxInit::SetDefaults() {
Self->CurrentSchemeShardId = 0;
Self->LastSchemaSeqNo = { };
Self->ProcessingParams.reset();
Self->LastWriteId = TWriteId{0};
Self->LastPlannedStep = 0;
Self->LastPlannedTxId = 0;
Self->LastCompletedTx = NOlap::TSnapshot::Zero();
Expand Down Expand Up @@ -73,7 +72,6 @@ bool TTxInit::Precharge(TTransactionContext& txc) {
ready = ready && Schema::GetSpecialValueOpt(db, Schema::EValueIds::LastSchemaSeqNoGeneration, Self->LastSchemaSeqNo.Generation);
ready = ready && Schema::GetSpecialValueOpt(db, Schema::EValueIds::LastSchemaSeqNoRound, Self->LastSchemaSeqNo.Round);
ready = ready && Schema::GetSpecialProtoValue(db, Schema::EValueIds::ProcessingParams, Self->ProcessingParams);
ready = ready && Schema::GetSpecialValueOpt(db, Schema::EValueIds::LastWriteId, Self->LastWriteId);
ready = ready && Schema::GetSpecialValueOpt(db, Schema::EValueIds::LastPlannedStep, Self->LastPlannedStep);
ready = ready && Schema::GetSpecialValueOpt(db, Schema::EValueIds::LastPlannedTxId, Self->LastPlannedTxId);
ready = ready && Schema::GetSpecialValueOpt(db, Schema::EValueIds::LastExportNumber, Self->LastExportNo);
Expand Down Expand Up @@ -107,7 +105,7 @@ bool TTxInit::ReadEverything(TTransactionContext& txc, const TActorContext& ctx)
ACFL_DEBUG("step", "TInsertTable::Load_Start");
TMemoryProfileGuard g("TTxInit/InsertTable");
auto localInsertTable = std::make_unique<NOlap::TInsertTable>();
if (!localInsertTable->Load(dbTable, TAppData::TimeProvider->Now())) {
if (!localInsertTable->Load(db, dbTable, TAppData::TimeProvider->Now())) {
ACFL_ERROR("step", "TInsertTable::Load_Fails");
return false;
}
Expand Down Expand Up @@ -182,7 +180,7 @@ bool TTxInit::ReadEverything(TTransactionContext& txc, const TActorContext& ctx)
}

while (!rowset.EndOfSet()) {
const TWriteId writeId = TWriteId{ rowset.GetValue<Schema::LongTxWrites::WriteId>() };
const TInsertWriteId writeId = (TInsertWriteId)rowset.GetValue<Schema::LongTxWrites::WriteId>();
const ui32 writePartId = rowset.GetValue<Schema::LongTxWrites::WritePartId>();
NKikimrLongTxService::TLongTxId proto;
Y_ABORT_UNLESS(proto.ParseFromString(rowset.GetValue<Schema::LongTxWrites::LongTxId>()));
Expand Down
13 changes: 7 additions & 6 deletions ydb/core/tx/columnshard/columnshard__write.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ void TColumnShard::Handle(TEvPrivate::TEvWriteBlobsResult::TPtr& ev, const TActo
auto result = std::make_unique<TEvColumnShard::TEvWriteResult>(TabletID(), writeMeta, errCode);
ctx.Send(writeMeta.GetSource(), result.release());
} else {
auto operation = OperationsManager->GetOperation((TWriteId)writeMeta.GetWriteId());
auto operation = OperationsManager->GetOperation((TOperationWriteId)writeMeta.GetWriteId());
Y_ABORT_UNLESS(operation);
auto result = NEvents::TDataEvents::TEvWriteResult::BuildError(TabletID(), operation->GetLockId(),
ev->Get()->GetWriteResultStatus(), ev->Get()->GetErrorMessage() ? ev->Get()->GetErrorMessage() : "put data fails");
Expand Down Expand Up @@ -324,7 +324,8 @@ class TCommitOperation {
return TConclusionStatus::Success();
}

std::unique_ptr<NColumnShard::TEvWriteCommitSyncTransactionOperator> CreateTxOperator(const NKikimrTxColumnShard::ETransactionKind kind) const {
std::unique_ptr<NColumnShard::TEvWriteCommitSyncTransactionOperator> CreateTxOperator(
const NKikimrTxColumnShard::ETransactionKind kind) const {
AFL_VERIFY(ReceivingShards.size());
if (IsPrimary()) {
return std::make_unique<NColumnShard::TEvWriteCommitPrimaryTransactionOperator>(
Expand Down Expand Up @@ -429,7 +430,7 @@ void TColumnShard::Handle(NEvents::TDataEvents::TEvWrite::TPtr& ev, const TActor
const auto source = ev->Sender;
const auto cookie = ev->Cookie;
const auto behaviour = TOperationsManager::GetBehaviour(*ev->Get());
// AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("ev_write", record.DebugString());
// AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("ev_write", record.DebugString());
if (behaviour == EOperationBehaviour::Undefined) {
Counters.GetTabletCounters()->IncCounter(COUNTER_WRITE_FAIL);
auto result = NEvents::TDataEvents::TEvWriteResult::BuildError(
Expand All @@ -447,8 +448,7 @@ void TColumnShard::Handle(NEvents::TDataEvents::TEvWrite::TPtr& ev, const TActor
auto commitOperation = std::make_shared<TCommitOperation>(TabletID());
const auto sendError = [&](const TString& message, const NKikimrDataEvents::TEvWriteResult::EStatus status) {
Counters.GetTabletCounters()->IncCounter(COUNTER_WRITE_FAIL);
auto result =
NEvents::TDataEvents::TEvWriteResult::BuildError(TabletID(), 0, status, message);
auto result = NEvents::TDataEvents::TEvWriteResult::BuildError(TabletID(), 0, status, message);
ctx.Send(source, result.release(), 0, cookie);
};
auto conclusionParse = commitOperation->Parse(*ev->Get());
Expand All @@ -466,7 +466,8 @@ void TColumnShard::Handle(NEvents::TDataEvents::TEvWrite::TPtr& ev, const TActor
" != " + ::ToString(commitOperation->GetGeneration()),
NKikimrDataEvents::TEvWriteResult::STATUS_LOCKS_BROKEN);
} else if (lockInfo->GetInternalGenerationCounter() != commitOperation->GetInternalGenerationCounter()) {
sendError("tablet lock have another internal generation counter: " + ::ToString(lockInfo->GetInternalGenerationCounter()) +
sendError(
"tablet lock have another internal generation counter: " + ::ToString(lockInfo->GetInternalGenerationCounter()) +
" != " + ::ToString(commitOperation->GetInternalGenerationCounter()),
NKikimrDataEvents::TEvWriteResult::STATUS_LOCKS_BROKEN);
} else {
Expand Down
18 changes: 0 additions & 18 deletions ydb/core/tx/columnshard/columnshard_common.cpp

This file was deleted.

94 changes: 0 additions & 94 deletions ydb/core/tx/columnshard/columnshard_common.h

This file was deleted.

Loading

0 comments on commit 71659be

Please sign in to comment.