Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

immediate write for bulk upsert #9489

Merged
1 change: 1 addition & 0 deletions ydb/core/kqp/ut/common/kqp_ut_common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ TKikimrRunner::TKikimrRunner(const TKikimrSettings& settings) {
appConfig.MutableTableServiceConfig()->SetEnableRowsDuplicationCheck(true);
ServerSettings->SetAppConfig(appConfig);
ServerSettings->SetFeatureFlags(settings.FeatureFlags);
ServerSettings->FeatureFlags.SetEnableImmediateWritingOnBulkUpsert(true);
ServerSettings->SetNodeCount(settings.NodeCount);
ServerSettings->SetEnableKqpSpilling(enableSpilling);
ServerSettings->SetEnableDataColumnForIndexTable(true);
Expand Down
9 changes: 1 addition & 8 deletions ydb/core/kqp/ut/olap/indexes_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ Y_UNIT_TEST_SUITE(KqpOlapIndexes) {

{
auto alterQuery = TStringBuilder() <<
"ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=UPSERT_OPTIONS, SCHEME_NEED_ACTUALIZATION=`true`, EXTERNAL_GUARANTEE_EXCLUSIVE_PK=`true`);";
"ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=UPSERT_OPTIONS, SCHEME_NEED_ACTUALIZATION=`true`);";
auto session = tableClient.CreateSession().GetValueSync().GetSession();
auto alterResult = session.ExecuteSchemeQuery(alterQuery).GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(alterResult.GetStatus(), NYdb::EStatus::SUCCESS, alterResult.GetIssues().ToString());
Expand Down Expand Up @@ -336,13 +336,6 @@ Y_UNIT_TEST_SUITE(KqpOlapIndexes) {
auto alterResult = session.ExecuteSchemeQuery(alterQuery).GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(alterResult.GetStatus(), NYdb::EStatus::SUCCESS, alterResult.GetIssues().ToString());
}
{
auto alterQuery = TStringBuilder() <<
"ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=UPSERT_OPTIONS, EXTERNAL_GUARANTEE_EXCLUSIVE_PK=`true`);";
auto session = tableClient.CreateSession().GetValueSync().GetSession();
auto alterResult = session.ExecuteSchemeQuery(alterQuery).GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(alterResult.GetStatus(), NYdb::EStatus::SUCCESS, alterResult.GetIssues().ToString());
}

std::vector<TString> uids;
std::vector<TString> resourceIds;
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8172,7 +8172,7 @@ Y_UNIT_TEST_SUITE(KqpOlapScheme) {
csController->WaitCompactions(TDuration::Seconds(5));
}

testHelper.ReadData("SELECT value FROM `/Root/ColumnTableTest`", "[[#];[#];[[42u]];[[43u]]]");
testHelper.ReadData("SELECT value FROM `/Root/ColumnTableTest` ORDER BY value", "[[#];[#];[[42u]];[[43u]]]");
}

Y_UNIT_TEST(DropThenAddColumn) {
Expand Down
1 change: 1 addition & 0 deletions ydb/core/protos/data_events.proto
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ message TEvWrite {
repeated uint32 ColumnIds = 3 [packed = true];
optional uint64 PayloadIndex = 4;
optional EDataFormat PayloadFormat = 5;
optional string PayloadSchema = 6;
}

// Transaction operations
Expand Down
1 change: 1 addition & 0 deletions ydb/core/protos/feature_flags.proto
Original file line number Diff line number Diff line change
Expand Up @@ -162,4 +162,5 @@ message TFeatureFlags {
optional bool EnableExternalDataSourcesOnServerless = 143 [default = true];
optional bool EnableSparsedColumns = 144 [default = false];
optional bool EnableParameterizedDecimal = 145 [default = false];
optional bool EnableImmediateWritingOnBulkUpsert = 146 [default = false];
}
52 changes: 27 additions & 25 deletions ydb/core/tx/columnshard/blobs_action/transaction/tx_write.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,28 +6,11 @@
namespace NKikimr::NColumnShard {

bool TTxWrite::InsertOneBlob(TTransactionContext& txc, const NOlap::TWideSerializedBatch& batch, const TInsertWriteId writeId) {
NKikimrTxColumnShard::TLogicalMetadata meta;
meta.SetNumRows(batch->GetRowsCount());
meta.SetRawBytes(batch->GetRawBytes());
meta.SetDirtyWriteTimeSeconds(batch.GetStartInstant().Seconds());
meta.SetSpecialKeysRawData(batch->GetSpecialKeysFullSafe());
meta.SetSpecialKeysPayloadData(batch->GetSpecialKeysPayloadSafe());

const auto& blobRange = batch.GetRange();
Y_ABORT_UNLESS(blobRange.GetBlobId().IsValid());
auto userData = batch.BuildInsertionUserData(*Self);
NOlap::TInsertedData insertData(writeId, userData);

// First write wins
TBlobGroupSelector dsGroupSelector(Self->Info());
NOlap::TDbWrapper dbTable(txc.DB, &dsGroupSelector);

const auto& writeMeta = batch.GetAggregation().GetWriteMeta();
meta.SetModificationType(TEnumOperator<NEvWrite::EModificationType>::SerializeToProto(writeMeta.GetModificationType()));
*meta.MutableSchemaSubset() = batch.GetAggregation().GetSchemaSubset().SerializeToProto();
auto schemeVersion = batch.GetAggregation().GetSchemaVersion();
auto tableSchema = Self->TablesManager.GetPrimaryIndex()->GetVersionedIndex().GetSchemaVerified(schemeVersion);

auto userData = std::make_shared<NOlap::TUserData>(writeMeta.GetTableId(), blobRange, meta, tableSchema->GetVersion(), batch->GetData());
NOlap::TInsertedData insertData(writeId, userData);
bool ok = Self->InsertTable->Insert(dbTable, std::move(insertData));
if (ok) {
Self->UpdateInsertTableCounters();
Expand All @@ -36,6 +19,18 @@ bool TTxWrite::InsertOneBlob(TTransactionContext& txc, const NOlap::TWideSeriali
return false;
}

bool TTxWrite::CommitOneBlob(TTransactionContext& txc, const NOlap::TWideSerializedBatch& batch, const TInsertWriteId writeId) {
auto userData = batch.BuildInsertionUserData(*Self);
TBlobGroupSelector dsGroupSelector(Self->Info());
NOlap::TDbWrapper dbTable(txc.DB, &dsGroupSelector);
NOlap::TCommittedData commitData(userData, Self->GetLastPlannedSnapshot(), Self->Generation(), writeId);
if (Self->TablesManager.HasTable(userData->GetPathId())) {
Self->InsertTable->CommitEphemeral(dbTable, std::move(commitData));
}
Self->UpdateInsertTableCounters();
return true;
}

bool TTxWrite::Execute(TTransactionContext& txc, const TActorContext&) {
TMemoryProfileGuard mpg("TTxWrite::Execute");
NActors::TLogContextGuard logGuard =
Expand Down Expand Up @@ -65,10 +60,17 @@ bool TTxWrite::Execute(TTransactionContext& txc, const TActorContext&) {
operation = Self->OperationsManager->GetOperationVerified((TOperationWriteId)writeMeta.GetWriteId());
Y_ABORT_UNLESS(operation->GetStatus() == EOperationStatus::Started);
for (auto&& i : aggr->GetSplittedBlobs()) {
const TInsertWriteId insertWriteId = Self->InsertTable->BuildNextWriteId(txc);
aggr->AddInsertWriteId(insertWriteId);
AFL_VERIFY(InsertOneBlob(txc, i, insertWriteId))("write_id", writeMeta.GetWriteId())("insert_write_id", insertWriteId)(
"size", aggr->GetSplittedBlobs().size());
if (operation->GetBehaviour() == EOperationBehaviour::NoTxWrite) {
static TAtomicCounter Counter = 0;
const TInsertWriteId insertWriteId = (TInsertWriteId)Counter.Inc();
AFL_VERIFY(CommitOneBlob(txc, i, insertWriteId))("write_id", writeMeta.GetWriteId())("insert_write_id", insertWriteId)(
"size", aggr->GetSplittedBlobs().size());
} else {
const TInsertWriteId insertWriteId = Self->InsertTable->BuildNextWriteId(txc);
aggr->AddInsertWriteId(insertWriteId);
AFL_VERIFY(InsertOneBlob(txc, i, insertWriteId))("write_id", writeMeta.GetWriteId())("insert_write_id", insertWriteId)(
"size", aggr->GetSplittedBlobs().size());
}
}
}
}
Expand All @@ -92,8 +94,6 @@ bool TTxWrite::Execute(TTransactionContext& txc, const TActorContext&) {
if (operation->GetBehaviour() == EOperationBehaviour::NoTxWrite) {
auto ev = NEvents::TDataEvents::TEvWriteResult::BuildCompleted(Self->TabletID());
Results.emplace_back(std::move(ev), writeMeta.GetSource(), operation->GetCookie());
Self->OperationsManager->AddTemporaryTxLink(operation->GetLockId());
Self->OperationsManager->CommitTransactionOnExecute(*Self, operation->GetLockId(), txc, Self->GetLastTxSnapshot());
} else if (operation->GetBehaviour() == EOperationBehaviour::InTxWrite) {
NKikimrTxColumnShard::TCommitWriteTxBody proto;
proto.SetLockId(operation->GetLockId());
Expand Down Expand Up @@ -156,13 +156,15 @@ void TTxWrite::Complete(const TActorContext& ctx) {
Self->GetOperationsManager().AddEventForLock(*Self, op->GetLockId(), evWrite);
}
if (op->GetBehaviour() == EOperationBehaviour::NoTxWrite) {
Self->OperationsManager->AddTemporaryTxLink(op->GetLockId());
Self->OperationsManager->CommitTransactionOnComplete(*Self, op->GetLockId(), Self->GetLastTxSnapshot());
}
}
Self->Counters.GetCSCounters().OnWriteTxComplete(now - writeMeta.GetWriteStartInstant());
Self->Counters.GetCSCounters().OnSuccessWriteResponse();
}
Self->Counters.GetTabletCounters()->IncCounter(COUNTER_IMMEDIATE_TX_COMPLETED);
Self->SetupIndexation();
}

} // namespace NKikimr::NColumnShard
5 changes: 3 additions & 2 deletions ydb/core/tx/columnshard/blobs_action/transaction/tx_write.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ class TTxWrite : public NTabletFlatExecutor::TTransactionBase<TColumnShard> {
TEvPrivate::TEvWriteBlobsResult::TPtr PutBlobResult;
const ui32 TabletTxNo;

bool CommitOneBlob(TTransactionContext& txc, const NOlap::TWideSerializedBatch& batch, const TInsertWriteId writeId);
bool InsertOneBlob(TTransactionContext& txc, const NOlap::TWideSerializedBatch& batch, const TInsertWriteId writeId);

class TReplyInfo {
private:
std::unique_ptr<NActors::IEventBase> Event;
Expand All @@ -43,8 +46,6 @@ class TTxWrite : public NTabletFlatExecutor::TTransactionBase<TColumnShard> {
std::vector<std::shared_ptr<TTxController::ITransactionOperator>> ResultOperators;


bool InsertOneBlob(TTransactionContext& txc, const NOlap::TWideSerializedBatch& batch, const TInsertWriteId writeId);

TStringBuilder TxPrefix() const {
return TStringBuilder() << "TxWrite[" << ToString(TabletTxNo) << "] ";
}
Expand Down
4 changes: 4 additions & 0 deletions ydb/core/tx/columnshard/columnshard_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -544,6 +544,10 @@ class TColumnShard
public:
ui64 TabletTxCounter = 0;

const TTablesManager& GetTablesManager() const {
return TablesManager;
}

bool HasLongTxWrites(const TInsertWriteId insertWriteId) const {
return LongTxWrites.contains(insertWriteId);
}
Expand Down
4 changes: 2 additions & 2 deletions ydb/core/tx/columnshard/engines/insert_table/committed.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@ class TCommittedData: public TUserDataContainer {
, DedupId(dedupId) {
}

TCommittedData(const std::shared_ptr<TUserData>& userData, const TSnapshot& ss, const TInsertWriteId insertWriteId)
TCommittedData(const std::shared_ptr<TUserData>& userData, const TSnapshot& ss, const ui64 generation, const TInsertWriteId ephemeralWriteId)
: TBase(userData)
, Snapshot(ss)
, DedupId(ToString(ss.GetPlanStep()) + ":" + ToString((ui64)insertWriteId)) {
, DedupId(ToString(generation) + ":" + ToString(ephemeralWriteId)) {
}

void SetRemove() {
Expand Down
16 changes: 16 additions & 0 deletions ydb/core/tx/columnshard/engines/insert_table/insert_table.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,22 @@ TInsertionSummary::TCounters TInsertTable::Commit(
return counters;
}

TInsertionSummary::TCounters TInsertTable::CommitEphemeral(IDbWrapper& dbTable, TCommittedData&& data) {
TInsertionSummary::TCounters counters;
counters.Rows += data.GetMeta().GetNumRows();
counters.RawBytes += data.GetMeta().GetRawBytes();
counters.Bytes += data.BlobSize();

AddBlobLink(data.GetBlobRange().BlobId);
const ui64 pathId = data.GetPathId();
auto& pathInfo = Summary.GetPathInfo(pathId);
AFL_TRACE(NKikimrServices::TX_COLUMNSHARD)("event", "commit_insertion")("path_id", pathId)("blob_range", data.GetBlobRange().ToString());
dbTable.Commit(data);
pathInfo.AddCommitted(std::move(data));

return counters;
}

void TInsertTable::Abort(IDbWrapper& dbTable, const THashSet<TInsertWriteId>& writeIds) {
Y_ABORT_UNLESS(!writeIds.empty());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ class TInsertTable: public TInsertTableAccessor {
bool Insert(IDbWrapper& dbTable, TInsertedData&& data);
TInsertionSummary::TCounters Commit(
IDbWrapper& dbTable, ui64 planStep, ui64 txId, const THashSet<TInsertWriteId>& writeIds, std::function<bool(ui64)> pathExists);
TInsertionSummary::TCounters CommitEphemeral(IDbWrapper& dbTable, TCommittedData&& data);
void Abort(IDbWrapper& dbTable, const THashSet<TInsertWriteId>& writeIds);
void MarkAsNotAbortable(const TInsertWriteId writeId) {
Summary.MarkAsNotAbortable(writeId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

namespace NKikimr::NOlap {

TCommittedData TInsertedData::Commit(const ui64 planStep, const ui64 txId) {
TCommittedData TInsertedData::Commit(const ui64 planStep, const ui64 txId) const {
return TCommittedData(UserData, planStep, txId, InsertWriteId);
}

Expand Down
2 changes: 1 addition & 1 deletion ydb/core/tx/columnshard/engines/insert_table/inserted.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ class TInsertedData: public TUserDataContainer {
/// One of them wins and becomes committed. Original DedupId would be lost then.
/// After commit we use original Initiator:WriteId as DedupId of inserted blob inside {PlanStep, TxId}.
/// pathId, initiator, {writeId}, {dedupId} -> pathId, planStep, txId, {dedupId}
[[nodiscard]] TCommittedData Commit(const ui64 planStep, const ui64 txId);
[[nodiscard]] TCommittedData Commit(const ui64 planStep, const ui64 txId) const;
};

} // namespace NKikimr::NOlap
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,8 @@ TConclusionStatus TStartMergeTask::DoExecuteImpl() {
break;
}
}
if (MergingContext->IsExclusiveInterval() && sourcesInMemory) {
if ((MergingContext->IsExclusiveInterval() || Context->GetCommonContext()->GetReadMetadata()->HasGuaranteeExclusivePK()) &&
sourcesInMemory) {
TMemoryProfileGuard mGuard("SCAN_PROFILE::MERGE::EXCLUSIVE", IS_DEBUG_LOG_ENABLED(NKikimrServices::TX_COLUMNSHARD_SCAN_MEMORY));
auto& container = Sources.begin()->second->GetStageResult().GetBatch();
if (container && container->num_rows()) {
Expand Down
14 changes: 8 additions & 6 deletions ydb/core/tx/columnshard/engines/ut/ut_logs_engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -451,8 +451,8 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) {
engine.Load(db);

std::vector<TCommittedData> dataToIndex = {
TCommittedData(TUserData::Build(paths[0], blobRanges[0], TLocalHelper::GetMetaProto(), 0, {}), TSnapshot(1, 2), (TInsertWriteId)2),
TCommittedData(TUserData::Build(paths[0], blobRanges[1], TLocalHelper::GetMetaProto(), 0, {}), TSnapshot(2, 1), (TInsertWriteId)1)
TCommittedData(TUserData::Build(paths[0], blobRanges[0], TLocalHelper::GetMetaProto(), 0, {}), TSnapshot(1, 2), 0, (TInsertWriteId)2),
TCommittedData(TUserData::Build(paths[0], blobRanges[1], TLocalHelper::GetMetaProto(), 0, {}), TSnapshot(2, 1), 0, (TInsertWriteId)1)
};

// write
Expand Down Expand Up @@ -553,7 +553,7 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) {
std::vector<TCommittedData> dataToIndex;
TSnapshot ss(planStep, txId);
dataToIndex.push_back(
TCommittedData(TUserData::Build(pathId, blobRange, TLocalHelper::GetMetaProto(), 0, {}), ss, (TInsertWriteId)txId));
TCommittedData(TUserData::Build(pathId, blobRange, TLocalHelper::GetMetaProto(), 0, {}), ss, 0, (TInsertWriteId)txId));

bool ok = Insert(engine, db, ss, std::move(dataToIndex), blobs, step);
UNIT_ASSERT(ok);
Expand Down Expand Up @@ -651,7 +651,8 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) {
// PlanStep, TxId, PathId, DedupId, BlobId, Data, [Metadata]
std::vector<TCommittedData> dataToIndex;
TSnapshot ss(planStep, txId);
dataToIndex.push_back(TCommittedData(TUserData::Build(pathId, blobRange, TLocalHelper::GetMetaProto(), 0, {}), ss, (TInsertWriteId)txId));
dataToIndex.push_back(
TCommittedData(TUserData::Build(pathId, blobRange, TLocalHelper::GetMetaProto(), 0, {}), ss, 0, (TInsertWriteId)txId));

bool ok = Insert(engine, db, ss, std::move(dataToIndex), blobs, step);
blobsAll.Merge(std::move(blobs));
Expand Down Expand Up @@ -682,7 +683,8 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) {
// PlanStep, TxId, PathId, DedupId, BlobId, Data, [Metadata]
std::vector<TCommittedData> dataToIndex;
TSnapshot ss(planStep, txId);
dataToIndex.push_back(TCommittedData(TUserData::Build(pathId, blobRange, TLocalHelper::GetMetaProto(), 0, {}), ss, TInsertWriteId(txId)));
dataToIndex.push_back(
TCommittedData(TUserData::Build(pathId, blobRange, TLocalHelper::GetMetaProto(), 0, {}), ss, 0, TInsertWriteId(txId)));

bool ok = Insert(engine, db, ss, std::move(dataToIndex), blobs, step);
UNIT_ASSERT(ok);
Expand Down Expand Up @@ -730,7 +732,7 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) {
TSnapshot ss(planStep, txId);
std::vector<TCommittedData> dataToIndex;
dataToIndex.push_back(
TCommittedData(TUserData::Build(pathId, blobRange, TLocalHelper::GetMetaProto(), 0, {}), ss, TInsertWriteId(txId)));
TCommittedData(TUserData::Build(pathId, blobRange, TLocalHelper::GetMetaProto(), 0, {}), ss, 0, TInsertWriteId(txId)));

bool ok = Insert(engine, db, ss, std::move(dataToIndex), blobs, step);
UNIT_ASSERT(ok);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
#include "indexed_blob_constructor.h"

#include <ydb/core/tx/columnshard/defs.h>
#include <ydb/core/tx/columnshard/blob.h>
#include <ydb/core/tx/columnshard/columnshard_impl.h>
#include <ydb/core/tx/columnshard/columnshard_private_events.h>

#include <ydb/core/tx/columnshard/defs.h>

namespace NKikimr::NOlap {

TIndexedWriteController::TIndexedWriteController(const TActorId& dstActor, const std::shared_ptr<IBlobsWritingAction>& action, std::vector<std::shared_ptr<TWriteAggregation>>&& aggregations)
TIndexedWriteController::TIndexedWriteController(
const TActorId& dstActor, const std::shared_ptr<IBlobsWritingAction>& action, std::vector<std::shared_ptr<TWriteAggregation>>&& aggregations)
: Buffer(action, std::move(aggregations))
, DstActor(dstActor)
{
, DstActor(dstActor) {
auto blobs = Buffer.GroupIntoBlobs();
for (auto&& b : blobs) {
auto& task = AddWriteTask(TBlobWriteInfo::BuildWriteTask(b.ExtractBlobData(), action));
Expand All @@ -33,6 +33,26 @@ void TWideSerializedBatch::InitBlobId(const TUnifiedBlobId& id) {
Range.BlobId = id;
}

std::shared_ptr<NKikimr::NOlap::TUserData> TWideSerializedBatch::BuildInsertionUserData(const NColumnShard::TColumnShard& owner) const {
NKikimrTxColumnShard::TLogicalMetadata meta;
meta.SetNumRows(SplittedBlobs.GetRowsCount());
meta.SetRawBytes(SplittedBlobs.GetRawBytes());
meta.SetDirtyWriteTimeSeconds(GetStartInstant().Seconds());
meta.SetSpecialKeysRawData(SplittedBlobs.GetSpecialKeysFullSafe());
meta.SetSpecialKeysPayloadData(SplittedBlobs.GetSpecialKeysPayloadSafe());

const auto& blobRange = Range;
Y_ABORT_UNLESS(blobRange.GetBlobId().IsValid());

const auto& writeMeta = GetAggregation().GetWriteMeta();
meta.SetModificationType(TEnumOperator<NEvWrite::EModificationType>::SerializeToProto(writeMeta.GetModificationType()));
*meta.MutableSchemaSubset() = GetAggregation().GetSchemaSubset().SerializeToProto();
auto schemeVersion = GetAggregation().GetSchemaVersion();
auto tableSchema = owner.GetTablesManager().GetPrimaryIndex()->GetVersionedIndex().GetSchemaVerified(schemeVersion);

return std::make_shared<NOlap::TUserData>(writeMeta.GetTableId(), blobRange, meta, tableSchema->GetVersion(), SplittedBlobs.GetData());
}

void TWritingBuffer::InitReadyInstant(const TMonotonic instant) {
for (auto&& aggr : Aggregations) {
aggr->MutableWriteMeta().SetWriteMiddle5StartInstant(instant);
Expand Down Expand Up @@ -89,4 +109,4 @@ TString TWritingBlob::ExtractBlobData() {
return result;
}

}
} // namespace NKikimr::NOlap
Loading
Loading