Skip to content

Commit

Permalink
fix htap test for deletion (#9313)
Browse files Browse the repository at this point in the history
  • Loading branch information
ivanmorozov333 authored Sep 17, 2024
1 parent a37a4ae commit 5163b36
Show file tree
Hide file tree
Showing 6 changed files with 32 additions and 17 deletions.
20 changes: 13 additions & 7 deletions ydb/core/kqp/executer_actor/kqp_data_executer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,6 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
userRequestContext, statementResultIndex, TWilsonKqp::DataExecuter, "DataExecuter", streamResult)
, AsyncIoFactory(std::move(asyncIoFactory))
, UseEvWriteForOltp(tableServiceConfig.GetEnableOltpSink())
, HtapTx(tableServiceConfig.GetEnableHtapTx())
, FederatedQuerySetup(federatedQuerySetup)
, GUCSettings(GUCSettings)
, ShardIdToTableInfo(shardIdToTableInfo)
Expand Down Expand Up @@ -2353,6 +2352,7 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da

absl::flat_hash_set<ui64> sendingShardsSet;
absl::flat_hash_set<ui64> receivingShardsSet;
absl::flat_hash_set<ui64> sendingColumnShardsSet;
absl::flat_hash_set<ui64> receivingColumnShardsSet;
ui64 arbiter = 0;
std::optional<ui64> columnShardArbiter;
Expand All @@ -2378,6 +2378,10 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
if (tx->HasLocks()) {
// Locks may be broken so shards with locks need to send readsets
sendingShardsSet.insert(shardId);

if (ShardIdToTableInfo->Get(shardId).IsOlap) {
sendingColumnShardsSet.insert(shardId);
}
}
if (ShardsWithEffects.contains(shardId)) {
// Volatile transactions may abort effects, so they send readsets
Expand All @@ -2387,7 +2391,7 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
// Effects are only applied when all locks are valid
receivingShardsSet.insert(shardId);

if (HtapTx && ShardIdToTableInfo->Get(shardId).IsOlap) {
if (ShardIdToTableInfo->Get(shardId).IsOlap) {
receivingColumnShardsSet.insert(shardId);
}
}
Expand Down Expand Up @@ -2445,10 +2449,13 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
}
}

if (!receivingColumnShardsSet.empty()) {
AFL_ENSURE(HtapTx);
const ui32 index = RandomNumber<ui32>(receivingColumnShardsSet.size());
auto arbiterIterator = std::begin(receivingColumnShardsSet);
if (!receivingColumnShardsSet.empty() || !sendingColumnShardsSet.empty()) {
const auto& shards = receivingColumnShardsSet.empty()
? sendingColumnShardsSet
: receivingColumnShardsSet;

const ui32 index = RandomNumber<ui32>(shards.size());
auto arbiterIterator = std::begin(shards);
std::advance(arbiterIterator, index);
columnShardArbiter = *arbiterIterator;
}
Expand Down Expand Up @@ -2857,7 +2864,6 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
private:
NYql::NDq::IDqAsyncIoFactory::TPtr AsyncIoFactory;
const bool UseEvWriteForOltp = false;
const bool HtapTx = false;
const std::optional<TKqpFederatedQuerySetup> FederatedQuerySetup;
const TGUCSettings::TPtr GUCSettings;
TShardIdToTableInfoPtr ShardIdToTableInfo;
Expand Down
9 changes: 5 additions & 4 deletions ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3190,12 +3190,13 @@ Y_UNIT_TEST_SUITE(KqpQueryService) {
{
auto result = client.ExecuteQuery(R"(
SELECT * FROM `/Root/DataShard`;
SELECT * FROM `/Root/ColumnShard`;
SELECT * FROM `/Root/DataShard` ORDER BY Col1;
SELECT * FROM `/Root/ColumnShard` ORDER BY Col1;
)", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync();
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
CompareYson(R"([])", FormatResultSetYson(result.GetResultSet(0)));
CompareYson(R"([])", FormatResultSetYson(result.GetResultSet(1)));
CompareYson(R"([[10u;"test1";10];[20u;"test2";11];[30u;"test3";12];[40u;"test";13];[101u;"test";101];[102u;"test";101];[103u;"test";101];[104u;"test";101];[1001u;"test";1001];[1002u;"test";1001];[1003u;"test";1001];[1004u;"test";1001]])",
FormatResultSetYson(result.GetResultSet(0)));
CompareYson(R"([[1u;"test";1];[2u;"test";1];[3u;"test";1];[4u;"test";1]])", FormatResultSetYson(result.GetResultSet(1)));
}
{
Expand Down
4 changes: 4 additions & 0 deletions ydb/core/tx/columnshard/columnshard_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@ class TGeneralCompactColumnEngineChanges;

namespace NKikimr::NColumnShard {

class TEvWriteCommitPrimaryTransactionOperator;
class TEvWriteCommitSecondaryTransactionOperator;
class TTxFinishAsyncTransaction;
class TTxInsertTableCleanup;
class TTxRemoveSharedBlobs;
Expand Down Expand Up @@ -138,6 +140,8 @@ class TColumnShard
: public TActor<TColumnShard>
, public NTabletFlatExecutor::TTabletExecutedFlat
{
friend class TEvWriteCommitSecondaryTransactionOperator;
friend class TEvWriteCommitPrimaryTransactionOperator;
friend class TTxInsertTableCleanup;
friend class TTxInit;
friend class TTxInitSchema;
Expand Down
3 changes: 2 additions & 1 deletion ydb/core/tx/columnshard/operations/slice_builder/builder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@ TConclusionStatus TBuildSlicesTask::DoExecute(const std::shared_ptr<ITask>& /*ta
if (OriginalBatch->num_columns() != indexSchema->num_fields()) {
AFL_VERIFY(OriginalBatch->num_columns() < indexSchema->num_fields())("original", OriginalBatch->num_columns())(
"index", indexSchema->num_fields());
if (HasAppData() && !AppDataVerified().FeatureFlags.GetEnableOptionalColumnsInColumnShard()) {
if (HasAppData() && !AppDataVerified().FeatureFlags.GetEnableOptionalColumnsInColumnShard() &&
WriteData.GetWriteMeta().GetModificationType() != NEvWrite::EModificationType::Delete) {
subset = NArrow::TSchemaSubset::AllFieldsAccepted();
const std::vector<ui32>& columnIdsVector = ActualSchema->GetIndexInfo().GetColumnIds(false);
const std::set<ui32> columnIdsSet(columnIdsVector.begin(), columnIdsVector.end());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,10 @@ class TEvWriteCommitPrimaryTransactionOperator: public TEvWriteCommitSyncTransac
auto op = Self->GetProgressTxController().GetTxOperatorVerifiedAs<TEvWriteCommitPrimaryTransactionOperator>(TxId);
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "ack_tablet")("wait", JoinSeq(",", op->WaitShardsResultAck))(
"receive", TabletId);
AFL_VERIFY(op->WaitShardsResultAck.erase(TabletId));
if (!op->WaitShardsResultAck.erase(TabletId)) {
AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "ack_tablet_duplication")("wait", JoinSeq(",", op->WaitShardsResultAck))(
"receive", TabletId);
}
op->CheckFinished(*Self);
}

Expand Down Expand Up @@ -188,7 +191,7 @@ class TEvWriteCommitPrimaryTransactionOperator: public TEvWriteCommitSyncTransac
if (tabletId && *tabletId != i) {
continue;
}
NActors::TActivationContext::AsActorContext().Send(MakePipePerNodeCacheID(EPipePerNodeCache::Persistent),
owner.Send(MakePipePerNodeCacheID(EPipePerNodeCache::Persistent),
new TEvPipeCache::TEvForward(
new TEvTxProcessing::TEvReadSetAck(0, GetTxId(), owner.TabletID(), i, owner.TabletID(), 0), i, true),
IEventHandle::FlagTrackDelivery, GetTxId());
Expand All @@ -202,7 +205,7 @@ class TEvWriteCommitPrimaryTransactionOperator: public TEvWriteCommitSyncTransac
readSetData.SetDecision(*TxBroken ? NKikimrTx::TReadSetData::DECISION_ABORT : NKikimrTx::TReadSetData::DECISION_COMMIT);
for (auto&& i : ReceivingShards) {
if (WaitShardsResultAck.contains(i)) {
NActors::TActivationContext::AsActorContext().Send(MakePipePerNodeCacheID(EPipePerNodeCache::Persistent),
owner.Send(MakePipePerNodeCacheID(EPipePerNodeCache::Persistent),
new TEvPipeCache::TEvForward(new TEvTxProcessing::TEvReadSet(TxInfo.PlanStep, GetTxId(), owner.TabletID(), i,
owner.TabletID(), readSetData.SerializeAsString()),
i, true),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ class TEvWriteCommitSecondaryTransactionOperator: public TEvWriteCommitSyncTrans
}

void SendBrokenFlagAck(TColumnShard& owner) {
NActors::TActivationContext::AsActorContext().Send(MakePipePerNodeCacheID(EPipePerNodeCache::Persistent),
owner.Send(MakePipePerNodeCacheID(EPipePerNodeCache::Persistent),
new TEvPipeCache::TEvForward(
new TEvTxProcessing::TEvReadSetAck(0, GetTxId(), owner.TabletID(), ArbiterTabletId, owner.TabletID(), 0), ArbiterTabletId, true),
IEventHandle::FlagTrackDelivery, GetTxId());
Expand All @@ -145,7 +145,7 @@ class TEvWriteCommitSecondaryTransactionOperator: public TEvWriteCommitSyncTrans
void SendResult(TColumnShard& owner) {
NKikimrTx::TReadSetData readSetData;
readSetData.SetDecision(SelfBroken ? NKikimrTx::TReadSetData::DECISION_ABORT : NKikimrTx::TReadSetData::DECISION_COMMIT);
NActors::TActivationContext::AsActorContext().Send(MakePipePerNodeCacheID(EPipePerNodeCache::Persistent),
owner.Send(MakePipePerNodeCacheID(EPipePerNodeCache::Persistent),
new TEvPipeCache::TEvForward(new TEvTxProcessing::TEvReadSet(
0, GetTxId(), owner.TabletID(), ArbiterTabletId, owner.TabletID(), readSetData.SerializeAsString()),
ArbiterTabletId, true),
Expand Down

0 comments on commit 5163b36

Please sign in to comment.