Skip to content

Commit

Permalink
fix error on start internal scanner (#11289)
Browse files Browse the repository at this point in the history
Co-authored-by: vlad-gogov <vlad-gogov@ydb.tech>
  • Loading branch information
ivanmorozov333 and vlad-gogov authored Nov 12, 2024
1 parent 7fe34bf commit d4c693f
Show file tree
Hide file tree
Showing 7 changed files with 43 additions and 11 deletions.
25 changes: 25 additions & 0 deletions ydb/core/kqp/ut/olap/kqp_olap_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2960,6 +2960,31 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
}
}

Y_UNIT_TEST(ScanFailedSnapshotTooOld) {
NKikimrConfig::TAppConfig appConfig;
appConfig.MutableTableServiceConfig()->SetEnableOlapSink(true);
appConfig.MutableColumnShardConfig()->SetMaxReadStaleness_ms(5000);
auto settings = TKikimrSettings().SetAppConfig(appConfig).SetWithSampleTables(false);
TTestHelper testHelper(settings);

TTestHelper::TColumnTable cnt;
TVector<TTestHelper::TColumnSchema> schema = {
TTestHelper::TColumnSchema().SetName("key").SetType(NScheme::NTypeIds::Int32).SetNullable(false),
TTestHelper::TColumnSchema().SetName("c").SetType(NScheme::NTypeIds::Int32).SetNullable(true)
};
cnt.SetName("/Root/cnt").SetPrimaryKey({ "key" }).SetSchema(schema);
testHelper.CreateTable(cnt);
Sleep(TDuration::Seconds(10));
auto client = testHelper.GetKikimr().GetQueryClient();
auto result =
client
.ExecuteQuery(
TStringBuilder() << "$v = SELECT CAST(COUNT(*) AS INT32) FROM `/Root/cnt`; INSERT INTO `/Root/cnt` (key, c) values(1, $v);",
NYdb::NQuery::TTxControl::BeginTx().CommitTx())
.GetValueSync();
UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::SUCCESS);
}
}

}
6 changes: 3 additions & 3 deletions ydb/core/tx/columnshard/columnshard__write.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -283,9 +283,9 @@ void TColumnShard::Handle(TEvColumnShard::TEvWrite::TPtr& ev, const TActorContex
writeData.MutableWriteMeta().SetWriteMiddle1StartInstant(TMonotonic::Now());

NOlap::TWritingContext context(TabletID(), SelfId(), snapshotSchema, StoragesManager,
Counters.GetIndexationCounters().SplitterCounters, Counters.GetCSCounters().WritingCounters);
Counters.GetIndexationCounters().SplitterCounters, Counters.GetCSCounters().WritingCounters, GetLastTxSnapshot());
std::shared_ptr<NConveyor::ITask> task = std::make_shared<NOlap::TBuildBatchesTask>(
BufferizationWriteActorId, std::move(writeData), GetLastTxSnapshot(), context);
BufferizationWriteActorId, std::move(writeData), context);
NConveyor::TInsertServiceOperator::AsyncTaskToExecute(task);
}
}
Expand Down Expand Up @@ -591,7 +591,7 @@ void TColumnShard::Handle(NEvents::TDataEvents::TEvWrite::TPtr& ev, const TActor
writeOperation->SetBehaviour(behaviour);
NOlap::TWritingContext wContext(
pathId, SelfId(), schema, StoragesManager, Counters.GetIndexationCounters().SplitterCounters,
Counters.GetCSCounters().WritingCounters);
Counters.GetCSCounters().WritingCounters, NOlap::TSnapshot::Max());
arrowData->SetSeparationPoints(GetIndexAs<NOlap::TColumnEngineForLogs>().GetGranulePtrVerified(pathId)->GetBucketPositions());
writeOperation->Start(*this, arrowData, source, wContext);
}
Expand Down
4 changes: 3 additions & 1 deletion ydb/core/tx/columnshard/data_reader/actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ void TActor::HandleExecute(NKqp::TEvKqpCompute::TEvScanData::TPtr& ev) {
} else {
AFL_INFO(NKikimrServices::TX_COLUMNSHARD)("event", "restore_task_finished")("reason", status.GetErrorMessage());
}
PassAway();
}
}

Expand All @@ -35,10 +36,11 @@ void TActor::HandleExecute(NKqp::TEvKqpCompute::TEvScanInitActor::TPtr& ev) {
}

void TActor::HandleExecute(NKqp::TEvKqpCompute::TEvScanError::TPtr& ev) {
SwitchStage(EStage::WaitData, EStage::Finished);
SwitchStage(std::nullopt, EStage::Finished);
AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("event", "problem_on_restore_data")(
"reason", NYql::IssuesFromMessageAsString(ev->Get()->Record.GetIssues()));
RestoreTask->OnError(NYql::IssuesFromMessageAsString(ev->Get()->Record.GetIssues()));
PassAway();
}

void TActor::Bootstrap(const TActorContext& /*ctx*/) {
Expand Down
6 changes: 4 additions & 2 deletions ydb/core/tx/columnshard/data_reader/actor.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,10 @@ class TActor: public NActors::TActorBootstrapped<TActor> {

EStage Stage = EStage::Initialization;
static inline const ui64 FreeSpace = ((ui64)8) << 20;
void SwitchStage(const EStage from, const EStage to) {
AFL_VERIFY(Stage == from)("from", (ui32)from)("real", (ui32)Stage)("to", (ui32)to);
void SwitchStage(const std::optional<EStage> from, const EStage to) {
if (from) {
AFL_VERIFY(Stage == *from)("from", (ui32)*from)("real", (ui32)Stage)("to", (ui32)to);
}
Stage = to;
}

Expand Down
4 changes: 2 additions & 2 deletions ydb/core/tx/columnshard/operations/batch_builder/builder.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,10 @@ class TBuildBatchesTask: public NConveyor::ITask {
}

TBuildBatchesTask(
const NActors::TActorId bufferActorId, NEvWrite::TWriteData&& writeData, const TSnapshot& actualSnapshot, const TWritingContext& context)
const NActors::TActorId bufferActorId, NEvWrite::TWriteData&& writeData, const TWritingContext& context)
: WriteData(std::move(writeData))
, BufferActorId(bufferActorId)
, ActualSnapshot(actualSnapshot)
, ActualSnapshot(context.GetApplyToSnapshot())
, Context(context) {
}
};
Expand Down
7 changes: 5 additions & 2 deletions ydb/core/tx/columnshard/operations/common/context.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,20 @@ class TWritingContext {
YDB_READONLY_DEF(std::shared_ptr<IStoragesManager>, StoragesManager);
YDB_READONLY_DEF(std::shared_ptr<NColumnShard::TSplitterCounters>, SplitterCounters);
YDB_READONLY_DEF(std::shared_ptr<NColumnShard::TWriteCounters>, WritingCounters);
YDB_READONLY(TSnapshot, ApplyToSnapshot, TSnapshot::Zero());

public:
TWritingContext(const ui64 tabletId, const NActors::TActorId& tabletActorId, const std::shared_ptr<ISnapshotSchema>& actualSchema,
const std::shared_ptr<IStoragesManager>& operators, const std::shared_ptr<NColumnShard::TSplitterCounters>& splitterCounters,
const std::shared_ptr<NColumnShard::TWriteCounters>& writingCounters)
const std::shared_ptr<NColumnShard::TWriteCounters>& writingCounters, const TSnapshot& applyToSnapshot)
: TabletId(tabletId)
, TabletActorId(tabletActorId)
, ActualSchema(actualSchema)
, StoragesManager(operators)
, SplitterCounters(splitterCounters)
, WritingCounters(writingCounters) {
, WritingCounters(writingCounters)
, ApplyToSnapshot(applyToSnapshot)
{
}
};
} // namespace NKikimr::NOlap
2 changes: 1 addition & 1 deletion ydb/core/tx/columnshard/operations/write.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ void TWriteOperation::Start(
NEvWrite::TWriteData writeData(writeMeta, data, owner.TablesManager.GetPrimaryIndex()->GetReplaceKey(),
owner.StoragesManager->GetInsertOperator()->StartWritingAction(NOlap::NBlobOperations::EConsumer::WRITING_OPERATOR), WritePortions);
std::shared_ptr<NConveyor::ITask> task =
std::make_shared<NOlap::TBuildBatchesTask>(owner.BufferizationWriteActorId, std::move(writeData), owner.GetLastTxSnapshot(), context);
std::make_shared<NOlap::TBuildBatchesTask>(owner.BufferizationWriteActorId, std::move(writeData), context);
NConveyor::TInsertServiceOperator::AsyncTaskToExecute(task);

Status = EOperationStatus::Started;
Expand Down

0 comments on commit d4c693f

Please sign in to comment.