Skip to content

Commit

Permalink
Merge branch 'ydb-platform:stable-24-1' into stable-24-1
Browse files Browse the repository at this point in the history
  • Loading branch information
jepett0 authored Mar 12, 2024
2 parents d0014ad + 8688b17 commit 8faad73
Show file tree
Hide file tree
Showing 34 changed files with 412 additions and 147 deletions.
38 changes: 38 additions & 0 deletions ydb/apps/version/version_definition.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
#include <ydb/core/driver_lib/version/version.h>

NKikimrConfig::TCurrentCompatibilityInfo NKikimr::TCompatibilityInfo::MakeCurrent() {
using TCurrentConstructor = NKikimr::TCompatibilityInfo::TProtoConstructor::TCurrentCompatibilityInfo;
using TVersionConstructor = NKikimr::TCompatibilityInfo::TProtoConstructor::TVersion;
using TCompatibilityRuleConstructor = NKikimr::TCompatibilityInfo::TProtoConstructor::TCompatibilityRule;

return TCurrentConstructor{
.Application = "ydb",
.Version = TVersionConstructor{
.Year = 24,
.Major = 1,
},
.CanLoadFrom = {
TCompatibilityRuleConstructor{
.LowerLimit = TVersionConstructor{ .Year = 23, .Major = 4 },
.UpperLimit = TVersionConstructor{ .Year = 24, .Major = 1 },
},
},
.StoresReadableBy = {
TCompatibilityRuleConstructor{
.LowerLimit = TVersionConstructor{ .Year = 23, .Major = 4 },
.UpperLimit = TVersionConstructor{ .Year = 24, .Major = 1 },
},
},
.CanConnectTo = {
TCompatibilityRuleConstructor{
.LowerLimit = TVersionConstructor{ .Year = 23, .Major = 4 },
.UpperLimit = TVersionConstructor{ .Year = 24, .Major = 1 },
},
TCompatibilityRuleConstructor{
.Application = "nbs",
.LowerLimit = TVersionConstructor{ .Year = 23, .Major = 3 },
.UpperLimit = TVersionConstructor{ .Year = 24, .Major = 1 },
},
}
}.ToPB();
}
11 changes: 11 additions & 0 deletions ydb/apps/version/ya.make
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
LIBRARY(version_definition)

SRCS(
version_definition.cpp
)

PEERDIR(
ydb/core/driver_lib/version
)

END()
1 change: 1 addition & 0 deletions ydb/apps/ydbd/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ IF (ARCH_X86_64)
ENDIF()

PEERDIR(
ydb/apps/version
ydb/core/driver_lib/run
ydb/core/protos
ydb/core/security
Expand Down
1 change: 1 addition & 0 deletions ydb/core/actorlib_impl/ut/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ ELSE()
ENDIF()

PEERDIR(
ydb/apps/version
ydb/library/actors/core
ydb/library/actors/interconnect
library/cpp/getopt
Expand Down
1 change: 1 addition & 0 deletions ydb/core/blobstorage/incrhuge/ut/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ ELSE()
ENDIF()

PEERDIR(
ydb/apps/version
ydb/library/actors/protos
ydb/core/blobstorage
ydb/core/blobstorage/incrhuge
Expand Down
1 change: 1 addition & 0 deletions ydb/core/blobstorage/ut_blobstorage/lib/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ SRCS(
PEERDIR(
library/cpp/digest/md5
library/cpp/testing/unittest
ydb/apps/version
ydb/core/base
ydb/core/blob_depot
ydb/core/blobstorage/backpressure
Expand Down
1 change: 1 addition & 0 deletions ydb/core/blobstorage/ut_group/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ ELSE()
ENDIF()

PEERDIR(
ydb/apps/version
ydb/library/actors/interconnect/mock
library/cpp/testing/unittest
ydb/core/blobstorage/crypto
Expand Down
1 change: 1 addition & 0 deletions ydb/core/blobstorage/ut_mirror3of4/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ SIZE(MEDIUM)
TIMEOUT(600)

PEERDIR(
ydb/apps/version
ydb/library/actors/interconnect/mock
library/cpp/testing/unittest
ydb/core/blobstorage/backpressure
Expand Down
1 change: 1 addition & 0 deletions ydb/core/blobstorage/ut_vdisk/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ SRCS(
)

PEERDIR(
ydb/apps/version
ydb/library/actors/protos
library/cpp/codecs
ydb/core/base
Expand Down
1 change: 1 addition & 0 deletions ydb/core/blobstorage/ut_vdisk2/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ SRCS(
)

PEERDIR(
ydb/apps/version
library/cpp/testing/unittest
ydb/core/blobstorage/backpressure
ydb/core/blobstorage/groupinfo
Expand Down
1 change: 1 addition & 0 deletions ydb/core/blobstorage/vdisk/syncer/ya.make
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
LIBRARY()

PEERDIR(
ydb/apps/version
ydb/library/actors/core
ydb/library/actors/interconnect
library/cpp/monlib/service/pages
Expand Down
1 change: 1 addition & 0 deletions ydb/core/driver_lib/version/ut/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ SIZE(MEDIUM)

PEERDIR(
ydb/core/driver_lib/version
ydb/apps/version
)

END()
33 changes: 1 addition & 32 deletions ydb/core/driver_lib/version/version.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,45 +17,14 @@ using EComponentId = NKikimrConfig::TCompatibilityRule;
using TComponentId = NKikimrConfig::TCompatibilityRule::EComponentId;

TCompatibilityInfo::TCompatibilityInfo() {
using TCurrentConstructor = TCompatibilityInfo::TProtoConstructor::TCurrentCompatibilityInfo;
using TStoredConstructor = TCompatibilityInfo::TProtoConstructor::TStoredCompatibilityInfo;
using TCompatibilityRuleConstructor = TCompatibilityInfo::TProtoConstructor::TCompatibilityRule;
using TVersionConstructor = TCompatibilityInfo::TProtoConstructor::TVersion;

/////////////////////////////////////////////////////////
// Current CompatibilityInfo
/////////////////////////////////////////////////////////

auto current = TCurrentConstructor{
.Application = "ydb",
.Version = TVersionConstructor{
.Year = 24,
.Major = 1,
},
.CanLoadFrom = {
TCompatibilityRuleConstructor{
.LowerLimit = TVersionConstructor{ .Year = 23, .Major = 4 },
.UpperLimit = TVersionConstructor{ .Year = 24, .Major = 1 },
},
},
.StoresReadableBy = {
TCompatibilityRuleConstructor{
.LowerLimit = TVersionConstructor{ .Year = 23, .Major = 4 },
.UpperLimit = TVersionConstructor{ .Year = 24, .Major = 1 },
},
},
.CanConnectTo = {
TCompatibilityRuleConstructor{
.LowerLimit = TVersionConstructor{ .Year = 23, .Major = 4 },
.UpperLimit = TVersionConstructor{ .Year = 24, .Major = 1 },
},
TCompatibilityRuleConstructor{
.Application = "nbs",
.LowerLimit = TVersionConstructor{ .Year = 23, .Major = 3 },
.UpperLimit = TVersionConstructor{ .Year = 24, .Major = 1 },
},
}
}.ToPB();
auto current = MakeCurrent();

bool success = CompleteFromTag(current);
Y_UNUSED(success);
Expand Down
2 changes: 2 additions & 0 deletions ydb/core/driver_lib/version/version.h
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,8 @@ class TCompatibilityInfo {

bool CompleteFromTag(NKikimrConfig::TCurrentCompatibilityInfo& current);

static NKikimrConfig::TCurrentCompatibilityInfo MakeCurrent();

NKikimrConfig::TStoredCompatibilityInfo MakeStored(TComponentId componentId) const;
NKikimrConfig::TStoredCompatibilityInfo MakeStored(TComponentId componentId,
const NKikimrConfig::TCurrentCompatibilityInfo* current) const;
Expand Down
123 changes: 60 additions & 63 deletions ydb/core/kqp/executer_actor/kqp_data_executer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2006,23 +2006,68 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
TTopicTabletTxs& topicTxs) {
TDatashardTxs datashardTxs;

std::vector<ui64> affectedShardsSet;
affectedShardsSet.reserve(datashardTasks.size());

for (auto& [shardId, tasks]: datashardTasks) {
auto [it, success] = datashardTxs.emplace(
shardId,
TasksGraph.GetMeta().Allocate<NKikimrTxDataShard::TKqpTransaction>());

YQL_ENSURE(success, "unexpected duplicates in datashard transactions");
affectedShardsSet.emplace_back(shardId);
NKikimrTxDataShard::TKqpTransaction* dsTxs = it->second;
dsTxs->MutableTasks()->Reserve(tasks.size());
for (auto& task: tasks) {
dsTxs->AddTasks()->Swap(task);
}
}

// Note: when locks map is present it will be mutated to avoid copying data
auto& locksMap = Request.DataShardLocks;
if (!locksMap.empty()) {
YQL_ENSURE(Request.LocksOp == ELocksOp::Commit || Request.LocksOp == ELocksOp::Rollback);
}

// Materialize (possibly empty) txs for all shards with locks (either commit or rollback)
for (auto& [shardId, locksList] : locksMap) {
YQL_ENSURE(!locksList.empty(), "unexpected empty locks list in DataShardLocks");

auto it = datashardTxs.find(shardId);
if (it == datashardTxs.end()) {
auto [emplaced, success] = datashardTxs.emplace(
shardId,
TasksGraph.GetMeta().Allocate<NKikimrTxDataShard::TKqpTransaction>());

YQL_ENSURE(success, "unexpected failure to emplace a datashard transaction");
it = emplaced;
}

NKikimrTxDataShard::TKqpTransaction* tx = it->second;
switch (Request.LocksOp) {
case ELocksOp::Commit:
tx->MutableLocks()->SetOp(NKikimrDataEvents::TKqpLocks::Commit);
break;
case ELocksOp::Rollback:
tx->MutableLocks()->SetOp(NKikimrDataEvents::TKqpLocks::Rollback);
break;
case ELocksOp::Unspecified:
break;
}

// Move lock descriptions to the datashard tx
auto* protoLocks = tx->MutableLocks()->MutableLocks();
protoLocks->Reserve(locksList.size());
bool hasWrites = false;
for (auto& lock : locksList) {
hasWrites = hasWrites || lock.GetHasWrites();
protoLocks->Add(std::move(lock));
}
locksList.clear();

// When locks with writes are committed this commits accumulated effects
if (Request.LocksOp == ELocksOp::Commit && hasWrites) {
ShardsWithEffects.insert(shardId);
YQL_ENSURE(!ReadOnlyTx);
}
}

Request.TopicOperations.BuildTopicTxs(topicTxs);

const bool needRollback = Request.LocksOp == ELocksOp::Rollback;
Expand All @@ -2042,7 +2087,7 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
// TODO: add support in the future
topicTxs.empty() &&
// We only want to use volatile transactions for multiple shards
(affectedShardsSet.size() + topicTxs.size()) > 1 &&
(datashardTxs.size() + topicTxs.size()) > 1 &&
// We cannot use volatile transactions with persistent channels
// Note: currently persistent channels are never used
!HasPersistentChannels);
Expand All @@ -2055,30 +2100,29 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
// Transactions with topics must always use generic readsets
!topicTxs.empty());

if (auto locksMap = Request.DataShardLocks;
!locksMap.empty() ||
VolatileTx ||
if (!locksMap.empty() || VolatileTx ||
Request.TopicOperations.HasReadOperations())
{
YQL_ENSURE(Request.LocksOp == ELocksOp::Commit || Request.LocksOp == ELocksOp::Rollback || VolatileTx);

bool needCommit = Request.LocksOp == ELocksOp::Commit || VolatileTx;

auto locksOp = needCommit
? NKikimrDataEvents::TKqpLocks::Commit
: NKikimrDataEvents::TKqpLocks::Rollback;

absl::flat_hash_set<ui64> sendingShardsSet;
absl::flat_hash_set<ui64> receivingShardsSet;

// Gather shards that need to send/receive readsets (shards with effects)
if (needCommit) {
for (auto& shardId: affectedShardsSet) {
for (auto& [shardId, tx] : datashardTxs) {
if (tx->HasLocks()) {
// Locks may be broken so shards with locks need to send readsets
sendingShardsSet.insert(shardId);
}
if (ShardsWithEffects.contains(shardId)) {
// Volatile transactions may abort effects, so they send readsets
if (VolatileTx) {
sendingShardsSet.insert(shardId);
}
// Effects are only applied when all locks are valid
receivingShardsSet.insert(shardId);
}
}
Expand All @@ -2093,44 +2137,7 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
}
}

// Gather locks that need to be committed or erased
for (auto& [shardId, locksList] : locksMap) {
NKikimrTxDataShard::TKqpTransaction* tx = nullptr;
auto it = datashardTxs.find(shardId);
if (it != datashardTxs.end()) {
tx = it->second;
} else {
auto [eIt, success] = datashardTxs.emplace(
shardId,
TasksGraph.GetMeta().Allocate<NKikimrTxDataShard::TKqpTransaction>());
tx = eIt->second;
}

tx->MutableLocks()->SetOp(locksOp);

if (!locksList.empty()) {
auto* protoLocks = tx->MutableLocks()->MutableLocks();
protoLocks->Reserve(locksList.size());
bool hasWrites = false;
for (auto& lock : locksList) {
hasWrites = hasWrites || lock.GetHasWrites();
protoLocks->Add()->Swap(&lock);
}

if (needCommit) {
// We also send the result on commit
sendingShardsSet.insert(shardId);

if (hasWrites) {
// Tx with uncommitted changes can be aborted due to conflicts,
// so shards with write locks should receive readsets
receivingShardsSet.insert(shardId);
YQL_ENSURE(!ReadOnlyTx);
}
}
}
}

// Encode sending/receiving shards in tx bodies
if (needCommit) {
NProtoBuf::RepeatedField<ui64> sendingShards(sendingShardsSet.begin(), sendingShardsSet.end());
NProtoBuf::RepeatedField<ui64> receivingShards(receivingShardsSet.begin(), receivingShardsSet.end());
Expand All @@ -2139,23 +2146,13 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
std::sort(receivingShards.begin(), receivingShards.end());

for (auto& [shardId, shardTx] : datashardTxs) {
shardTx->MutableLocks()->SetOp(locksOp);
shardTx->MutableLocks()->SetOp(NKikimrDataEvents::TKqpLocks::Commit);
*shardTx->MutableLocks()->MutableSendingShards() = sendingShards;
*shardTx->MutableLocks()->MutableReceivingShards() = receivingShards;
}

for (auto& [_, tx] : topicTxs) {
switch (locksOp) {
case NKikimrDataEvents::TKqpLocks::Commit:
tx.SetOp(NKikimrPQ::TDataTransaction::Commit);
break;
case NKikimrDataEvents::TKqpLocks::Rollback:
tx.SetOp(NKikimrPQ::TDataTransaction::Rollback);
break;
case NKikimrDataEvents::TKqpLocks::Unspecified:
break;
}

tx.SetOp(NKikimrPQ::TDataTransaction::Commit);
*tx.MutableSendingShards() = sendingShards;
*tx.MutableReceivingShards() = receivingShards;
}
Expand Down
1 change: 1 addition & 0 deletions ydb/core/mind/bscontroller/ut_selfheal/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ SRCS(
)

PEERDIR(
ydb/apps/version
ydb/core/blobstorage/dsproxy/mock
ydb/core/blobstorage/pdisk/mock
ydb/core/mind/bscontroller
Expand Down
1 change: 1 addition & 0 deletions ydb/core/testlib/actors/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ SRCS(
)

PEERDIR(
ydb/apps/version
ydb/library/actors/testlib
library/cpp/testing/unittest
ydb/core/base
Expand Down
Loading

0 comments on commit 8faad73

Please sign in to comment.