Skip to content

Commit

Permalink
Merge 50031e7 into c0d60de
Browse files Browse the repository at this point in the history
  • Loading branch information
ivanmorozov333 authored Sep 8, 2024
2 parents c0d60de + 50031e7 commit b53f33b
Show file tree
Hide file tree
Showing 51 changed files with 775 additions and 725 deletions.
3 changes: 1 addition & 2 deletions ydb/core/kqp/ut/olap/kqp_olap_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2897,13 +2897,12 @@ Y_UNIT_TEST_SUITE(KqpOlap) {

WriteTestData(kikimr, "/Root/olapStore/olapTable0", 0, 1000000, 3, true);


auto client = kikimr.GetQueryClient();
{
auto result = client.ExecuteQuery(R"(
SELECT * FROM `/Root/olapStore/olapTable0` ORDER BY timestamp;
INSERT INTO `/Root/olapStore/olapTable1` SELECT * FROM `/Root/olapStore/olapTable0`;
INSERT INTO `/Root/olapStore/olapTable0` SELECT * FROM `/Root/olapStore/olapTable1`;
REPLACE INTO `/Root/olapStore/olapTable0` SELECT * FROM `/Root/olapStore/olapTable1`;
SELECT * FROM `/Root/olapStore/olapTable1` ORDER BY timestamp;
)", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync();
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@
namespace NKikimr::NColumnShard {
class TTxInsertTableCleanup: public TTransactionBase<TColumnShard> {
private:
THashSet<TWriteId> WriteIdsToAbort;
THashSet<TInsertWriteId> WriteIdsToAbort;
std::shared_ptr<NOlap::IBlobsDeclareRemovingAction> BlobsAction;
public:
TTxInsertTableCleanup(TColumnShard* self, THashSet<TWriteId>&& writeIdsToAbort)
TTxInsertTableCleanup(TColumnShard* self, THashSet<TInsertWriteId>&& writeIdsToAbort)
: TBase(self)
, WriteIdsToAbort(std::move(writeIdsToAbort)) {
Y_ABORT_UNLESS(WriteIdsToAbort.size() || self->InsertTable->GetAborted().size());
Expand Down
64 changes: 31 additions & 33 deletions ydb/core/tx/columnshard/blobs_action/transaction/tx_write.cpp
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
#include "tx_write.h"

#include <ydb/core/tx/columnshard/engines/insert_table/user_data.h>
#include <ydb/core/tx/columnshard/transactions/locks/write.h>

namespace NKikimr::NColumnShard {

bool TTxWrite::InsertOneBlob(TTransactionContext& txc, const NOlap::TWideSerializedBatch& batch, const TWriteId writeId) {
bool TTxWrite::InsertOneBlob(TTransactionContext& txc, const NOlap::TWideSerializedBatch& batch, const TInsertWriteId writeId) {
NKikimrTxColumnShard::TLogicalMetadata meta;
meta.SetNumRows(batch->GetRowsCount());
meta.SetRawBytes(batch->GetRawBytes());
Expand All @@ -23,9 +25,8 @@ bool TTxWrite::InsertOneBlob(TTransactionContext& txc, const NOlap::TWideSeriali
auto schemeVersion = batch.GetAggregation().GetSchemaVersion();
auto tableSchema = Self->TablesManager.GetPrimaryIndex()->GetVersionedIndex().GetSchemaVerified(schemeVersion);

NOlap::TInsertedData insertData((ui64)writeId, writeMeta.GetTableId(), writeMeta.GetDedupId(), blobRange,
meta, tableSchema->GetVersion(),
batch->GetData());
auto userData = std::make_shared<NOlap::TUserData>(writeMeta.GetTableId(), blobRange, meta, tableSchema->GetVersion(), batch->GetData());
NOlap::TInsertedData insertData(writeId, userData);
bool ok = Self->InsertTable->Insert(dbTable, std::move(insertData));
if (ok) {
Self->UpdateInsertTableCounters();
Expand All @@ -36,7 +37,8 @@ bool TTxWrite::InsertOneBlob(TTransactionContext& txc, const NOlap::TWideSeriali

bool TTxWrite::Execute(TTransactionContext& txc, const TActorContext&) {
TMemoryProfileGuard mpg("TTxWrite::Execute");
NActors::TLogContextGuard logGuard = NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD_BLOBS)("tablet_id", Self->TabletID())("tx_state", "execute");
NActors::TLogContextGuard logGuard =
NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD_BLOBS)("tablet_id", Self->TabletID())("tx_state", "execute");
ACFL_DEBUG("event", "start_execute");
const NOlap::TWritingBuffer& buffer = PutBlobResult->Get()->MutableWritesBuffer();
for (auto&& aggr : buffer.GetAggregations()) {
Expand All @@ -45,33 +47,27 @@ bool TTxWrite::Execute(TTransactionContext& txc, const TActorContext&) {
txc.DB.NoMoreReadsForTx();
TWriteOperation::TPtr operation;
if (writeMeta.HasLongTxId()) {
NIceDb::TNiceDb db(txc.DB);
const TInsertWriteId insertWriteId =
Self->GetLongTxWrite(db, writeMeta.GetLongTxIdUnsafe(), writeMeta.GetWritePartId(), writeMeta.GetGranuleShardingVersion());
aggr->AddInsertWriteId(insertWriteId);
if (writeMeta.IsGuaranteeWriter()) {
AFL_VERIFY(aggr->GetSplittedBlobs().size() == 1)("count", aggr->GetSplittedBlobs().size());
} else {
AFL_VERIFY(aggr->GetSplittedBlobs().size() <= 1)("count", aggr->GetSplittedBlobs().size());
}
if (aggr->GetSplittedBlobs().size() == 1) {
AFL_VERIFY(InsertOneBlob(txc, aggr->GetSplittedBlobs().front(), insertWriteId))("write_id", writeMeta.GetWriteId())(
"insert_write_id", insertWriteId);
}
} else {
operation = Self->OperationsManager->GetOperation((TWriteId)writeMeta.GetWriteId());
Y_ABORT_UNLESS(operation);
operation = Self->OperationsManager->GetOperationVerified((TOperationWriteId)writeMeta.GetWriteId());
Y_ABORT_UNLESS(operation->GetStatus() == EOperationStatus::Started);
}

auto writeId = TWriteId(writeMeta.GetWriteId());
if (!operation) {
NIceDb::TNiceDb db(txc.DB);
writeId = Self->GetLongTxWrite(db, writeMeta.GetLongTxIdUnsafe(), writeMeta.GetWritePartId(), writeMeta.GetGranuleShardingVersion());
aggr->AddWriteId(writeId);
}

for (auto&& i : aggr->GetSplittedBlobs()) {
if (operation) {
writeId = Self->BuildNextWriteId(txc);
aggr->AddWriteId(writeId);
}

if (!InsertOneBlob(txc, i, writeId)) {
LOG_S_DEBUG(TxPrefix() << "duplicate writeId " << (ui64)writeId << TxSuffix());
Self->Counters.GetTabletCounters()->IncCounter(COUNTER_WRITE_DUPLICATE);
for (auto&& i : aggr->GetSplittedBlobs()) {
const TInsertWriteId insertWriteId = Self->InsertTable->BuildNextWriteId(txc);
aggr->AddInsertWriteId(insertWriteId);
AFL_VERIFY(InsertOneBlob(txc, i, insertWriteId))("write_id", writeMeta.GetWriteId())("insert_write_id", insertWriteId)(
"size", aggr->GetSplittedBlobs().size());
}
}
}
Expand All @@ -88,9 +84,10 @@ bool TTxWrite::Execute(TTransactionContext& txc, const TActorContext&) {
for (auto&& aggr : buffer.GetAggregations()) {
const auto& writeMeta = aggr->GetWriteMeta();
if (!writeMeta.HasLongTxId()) {
auto operation = Self->OperationsManager->GetOperationVerified((TWriteId)writeMeta.GetWriteId());
auto operation = Self->OperationsManager->GetOperationVerified((TOperationWriteId)writeMeta.GetWriteId());
Y_ABORT_UNLESS(operation->GetStatus() == EOperationStatus::Started);
operation->OnWriteFinish(txc, aggr->GetWriteIds(), operation->GetBehaviour() == EOperationBehaviour::NoTxWrite);
operation->OnWriteFinish(txc, aggr->GetInsertWriteIds(), operation->GetBehaviour() == EOperationBehaviour::NoTxWrite);
Self->OperationsManager->LinkInsertWriteIdToOperationWriteId(aggr->GetInsertWriteIds(), operation->GetWriteId());
if (operation->GetBehaviour() == EOperationBehaviour::NoTxWrite) {
auto ev = NEvents::TDataEvents::TEvWriteResult::BuildCompleted(Self->TabletID());
Results.emplace_back(std::move(ev), writeMeta.GetSource(), operation->GetCookie());
Expand Down Expand Up @@ -119,8 +116,9 @@ bool TTxWrite::Execute(TTransactionContext& txc, const TActorContext&) {
Results.emplace_back(std::move(ev), writeMeta.GetSource(), operation->GetCookie());
}
} else {
Y_ABORT_UNLESS(aggr->GetWriteIds().size() == 1);
auto ev = std::make_unique<TEvColumnShard::TEvWriteResult>(Self->TabletID(), writeMeta, (ui64)aggr->GetWriteIds().front(), NKikimrTxColumnShard::EResultStatus::SUCCESS);
Y_ABORT_UNLESS(aggr->GetInsertWriteIds().size() == 1);
auto ev = std::make_unique<TEvColumnShard::TEvWriteResult>(
Self->TabletID(), writeMeta, (ui64)aggr->GetInsertWriteIds().front(), NKikimrTxColumnShard::EResultStatus::SUCCESS);
Results.emplace_back(std::move(ev), writeMeta.GetSource(), 0);
}
}
Expand All @@ -129,7 +127,8 @@ bool TTxWrite::Execute(TTransactionContext& txc, const TActorContext&) {

void TTxWrite::Complete(const TActorContext& ctx) {
TMemoryProfileGuard mpg("TTxWrite::Complete");
NActors::TLogContextGuard logGuard = NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD_BLOBS)("tablet_id", Self->TabletID())("tx_state", "complete");
NActors::TLogContextGuard logGuard =
NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD_BLOBS)("tablet_id", Self->TabletID())("tx_state", "complete");
const auto now = TMonotonic::Now();
const NOlap::TWritingBuffer& buffer = PutBlobResult->Get()->MutableWritesBuffer();
for (auto&& i : buffer.GetAddActions()) {
Expand All @@ -149,7 +148,7 @@ void TTxWrite::Complete(const TActorContext& ctx) {
for (ui32 i = 0; i < buffer.GetAggregations().size(); ++i) {
const auto& writeMeta = buffer.GetAggregations()[i]->GetWriteMeta();
if (!writeMeta.HasLongTxId()) {
auto op = Self->GetOperationsManager().GetOperationVerified(NOlap::TWriteId(writeMeta.GetWriteId()));
auto op = Self->GetOperationsManager().GetOperationVerified((TOperationWriteId)writeMeta.GetWriteId());
if (op->GetBehaviour() == EOperationBehaviour::WriteWithLock || op->GetBehaviour() == EOperationBehaviour::NoTxWrite) {
auto evWrite = std::make_shared<NOlap::NTxInteractions::TEvWriteWriter>(writeMeta.GetTableId(),
buffer.GetAggregations()[i]->GetRecordBatch(), Self->GetIndexOptional()->GetVersionedIndex().GetPrimaryKey());
Expand All @@ -158,12 +157,11 @@ void TTxWrite::Complete(const TActorContext& ctx) {
if (op->GetBehaviour() == EOperationBehaviour::NoTxWrite) {
Self->OperationsManager->CommitTransactionOnComplete(*Self, op->GetLockId(), Self->GetLastTxSnapshot());
}

}
Self->Counters.GetCSCounters().OnWriteTxComplete(now - writeMeta.GetWriteStartInstant());
Self->Counters.GetCSCounters().OnSuccessWriteResponse();
}
Self->Counters.GetTabletCounters()->IncCounter(COUNTER_IMMEDIATE_TX_COMPLETED);
}

}
} // namespace NKikimr::NColumnShard
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ class TTxWrite : public NTabletFlatExecutor::TTransactionBase<TColumnShard> {
std::vector<std::shared_ptr<TTxController::ITransactionOperator>> ResultOperators;


bool InsertOneBlob(TTransactionContext& txc, const NOlap::TWideSerializedBatch& batch, const TWriteId writeId);
bool InsertOneBlob(TTransactionContext& txc, const NOlap::TWideSerializedBatch& batch, const TInsertWriteId writeId);

TStringBuilder TxPrefix() const {
return TStringBuilder() << "TxWrite[" << ToString(TabletTxNo) << "] ";
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/tx/columnshard/columnshard.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ void TColumnShard::UpdateIndexCounters() {

ui64 TColumnShard::MemoryUsage() const {
ui64 memory = ProgressTxController->GetMemoryUsage() + ScanTxInFlight.size() * (sizeof(ui64) + sizeof(TInstant)) +
LongTxWrites.size() * (sizeof(TWriteId) + sizeof(TLongTxWriteInfo)) +
LongTxWrites.size() * (sizeof(TInsertWriteId) + sizeof(TLongTxWriteInfo)) +
LongTxWritesByUniqueId.size() * (sizeof(TULID) + sizeof(void*)) +
(WaitingScans.size()) * (sizeof(NOlap::TSnapshot) + sizeof(void*)) +
Counters.GetTabletCounters()->GetValue(COUNTER_PREPARED_RECORDS) * sizeof(NOlap::TInsertedData) +
Expand Down
6 changes: 2 additions & 4 deletions ydb/core/tx/columnshard/columnshard__init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ void TTxInit::SetDefaults() {
Self->CurrentSchemeShardId = 0;
Self->LastSchemaSeqNo = { };
Self->ProcessingParams.reset();
Self->LastWriteId = TWriteId{0};
Self->LastPlannedStep = 0;
Self->LastPlannedTxId = 0;
Self->LastCompletedTx = NOlap::TSnapshot::Zero();
Expand Down Expand Up @@ -73,7 +72,6 @@ bool TTxInit::Precharge(TTransactionContext& txc) {
ready = ready && Schema::GetSpecialValueOpt(db, Schema::EValueIds::LastSchemaSeqNoGeneration, Self->LastSchemaSeqNo.Generation);
ready = ready && Schema::GetSpecialValueOpt(db, Schema::EValueIds::LastSchemaSeqNoRound, Self->LastSchemaSeqNo.Round);
ready = ready && Schema::GetSpecialProtoValue(db, Schema::EValueIds::ProcessingParams, Self->ProcessingParams);
ready = ready && Schema::GetSpecialValueOpt(db, Schema::EValueIds::LastWriteId, Self->LastWriteId);
ready = ready && Schema::GetSpecialValueOpt(db, Schema::EValueIds::LastPlannedStep, Self->LastPlannedStep);
ready = ready && Schema::GetSpecialValueOpt(db, Schema::EValueIds::LastPlannedTxId, Self->LastPlannedTxId);
ready = ready && Schema::GetSpecialValueOpt(db, Schema::EValueIds::LastExportNumber, Self->LastExportNo);
Expand Down Expand Up @@ -107,7 +105,7 @@ bool TTxInit::ReadEverything(TTransactionContext& txc, const TActorContext& ctx)
ACFL_DEBUG("step", "TInsertTable::Load_Start");
TMemoryProfileGuard g("TTxInit/InsertTable");
auto localInsertTable = std::make_unique<NOlap::TInsertTable>();
if (!localInsertTable->Load(dbTable, TAppData::TimeProvider->Now())) {
if (!localInsertTable->Load(db, dbTable, TAppData::TimeProvider->Now())) {
ACFL_ERROR("step", "TInsertTable::Load_Fails");
return false;
}
Expand Down Expand Up @@ -182,7 +180,7 @@ bool TTxInit::ReadEverything(TTransactionContext& txc, const TActorContext& ctx)
}

while (!rowset.EndOfSet()) {
const TWriteId writeId = TWriteId{ rowset.GetValue<Schema::LongTxWrites::WriteId>() };
const TInsertWriteId writeId = (TInsertWriteId)rowset.GetValue<Schema::LongTxWrites::WriteId>();
const ui32 writePartId = rowset.GetValue<Schema::LongTxWrites::WritePartId>();
NKikimrLongTxService::TLongTxId proto;
Y_ABORT_UNLESS(proto.ParseFromString(rowset.GetValue<Schema::LongTxWrites::LongTxId>()));
Expand Down
24 changes: 13 additions & 11 deletions ydb/core/tx/columnshard/columnshard__write.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ void TColumnShard::Handle(TEvPrivate::TEvWriteBlobsResult::TPtr& ev, const TActo
auto result = std::make_unique<TEvColumnShard::TEvWriteResult>(TabletID(), writeMeta, errCode);
ctx.Send(writeMeta.GetSource(), result.release());
} else {
auto operation = OperationsManager->GetOperation((TWriteId)writeMeta.GetWriteId());
auto operation = OperationsManager->GetOperation((TOperationWriteId)writeMeta.GetWriteId());
Y_ABORT_UNLESS(operation);
auto result = NEvents::TDataEvents::TEvWriteResult::BuildError(TabletID(), operation->GetLockId(),
ev->Get()->GetWriteResultStatus(), ev->Get()->GetErrorMessage() ? ev->Get()->GetErrorMessage() : "put data fails");
Expand Down Expand Up @@ -289,7 +289,7 @@ class TCommitOperation {
}

TConclusionStatus Parse(const NEvents::TDataEvents::TEvWrite& evWrite) {
AFL_VERIFY(evWrite.Record.GetLocks().GetLocks().size() == 1);
AFL_VERIFY(evWrite.Record.GetLocks().GetLocks().size() >= 1);
auto& locks = evWrite.Record.GetLocks();
auto& lock = evWrite.Record.GetLocks().GetLocks()[0];
SendingShards = std::set<ui64>(locks.GetSendingShards().begin(), locks.GetSendingShards().end());
Expand Down Expand Up @@ -324,7 +324,8 @@ class TCommitOperation {
return TConclusionStatus::Success();
}

std::unique_ptr<NColumnShard::TEvWriteCommitSyncTransactionOperator> CreateTxOperator(const NKikimrTxColumnShard::ETransactionKind kind) const {
std::unique_ptr<NColumnShard::TEvWriteCommitSyncTransactionOperator> CreateTxOperator(
const NKikimrTxColumnShard::ETransactionKind kind) const {
AFL_VERIFY(ReceivingShards.size());
if (IsPrimary()) {
return std::make_unique<NColumnShard::TEvWriteCommitPrimaryTransactionOperator>(
Expand Down Expand Up @@ -428,15 +429,16 @@ void TColumnShard::Handle(NEvents::TDataEvents::TEvWrite::TPtr& ev, const TActor
const auto& record = ev->Get()->Record;
const auto source = ev->Sender;
const auto cookie = ev->Cookie;
const auto behaviour = TOperationsManager::GetBehaviour(*ev->Get());
// AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("ev_write", record.DebugString());
if (behaviour == EOperationBehaviour::Undefined) {
const auto behaviourConclusion = TOperationsManager::GetBehaviour(*ev->Get());
// AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("ev_write", record.DebugString());
if (behaviourConclusion.IsFail()) {
Counters.GetTabletCounters()->IncCounter(COUNTER_WRITE_FAIL);
auto result = NEvents::TDataEvents::TEvWriteResult::BuildError(
TabletID(), 0, NKikimrDataEvents::TEvWriteResult::STATUS_BAD_REQUEST, "invalid write event");
auto result = NEvents::TDataEvents::TEvWriteResult::BuildError(TabletID(), 0, NKikimrDataEvents::TEvWriteResult::STATUS_BAD_REQUEST,
"invalid write event: " + behaviourConclusion.GetErrorMessage());
ctx.Send(source, result.release(), 0, cookie);
return;
}
auto behaviour = *behaviourConclusion;

if (behaviour == EOperationBehaviour::AbortWriteLock) {
Execute(new TAbortWriteTransaction(this, record.GetLocks().GetLocks()[0].GetLockId(), source, cookie), ctx);
Expand All @@ -447,8 +449,7 @@ void TColumnShard::Handle(NEvents::TDataEvents::TEvWrite::TPtr& ev, const TActor
auto commitOperation = std::make_shared<TCommitOperation>(TabletID());
const auto sendError = [&](const TString& message, const NKikimrDataEvents::TEvWriteResult::EStatus status) {
Counters.GetTabletCounters()->IncCounter(COUNTER_WRITE_FAIL);
auto result =
NEvents::TDataEvents::TEvWriteResult::BuildError(TabletID(), 0, status, message);
auto result = NEvents::TDataEvents::TEvWriteResult::BuildError(TabletID(), 0, status, message);
ctx.Send(source, result.release(), 0, cookie);
};
auto conclusionParse = commitOperation->Parse(*ev->Get());
Expand All @@ -466,7 +467,8 @@ void TColumnShard::Handle(NEvents::TDataEvents::TEvWrite::TPtr& ev, const TActor
" != " + ::ToString(commitOperation->GetGeneration()),
NKikimrDataEvents::TEvWriteResult::STATUS_LOCKS_BROKEN);
} else if (lockInfo->GetInternalGenerationCounter() != commitOperation->GetInternalGenerationCounter()) {
sendError("tablet lock have another internal generation counter: " + ::ToString(lockInfo->GetInternalGenerationCounter()) +
sendError(
"tablet lock have another internal generation counter: " + ::ToString(lockInfo->GetInternalGenerationCounter()) +
" != " + ::ToString(commitOperation->GetInternalGenerationCounter()),
NKikimrDataEvents::TEvWriteResult::STATUS_LOCKS_BROKEN);
} else {
Expand Down
18 changes: 0 additions & 18 deletions ydb/core/tx/columnshard/columnshard_common.cpp

This file was deleted.

Loading

0 comments on commit b53f33b

Please sign in to comment.