Skip to content

Commit

Permalink
Merge pull request #11430 from uzhastik/24_3_merge_16
Browse files Browse the repository at this point in the history
24 3 merge 16
  • Loading branch information
maximyurchuk authored Nov 10, 2024
2 parents 346eb69 + ac138d9 commit 9e1fcd3
Show file tree
Hide file tree
Showing 58 changed files with 989 additions and 97 deletions.
4 changes: 4 additions & 0 deletions ydb/core/blobstorage/nodewarden/node_warden_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,10 @@ void TNodeWarden::RemoveDrivesWithBadSerialsAndReport(TVector<NPDisk::TDriveData
}

TVector<NPDisk::TDriveData> TNodeWarden::ListLocalDrives() {
if (!AppData()->FeatureFlags.GetEnableDriveSerialsDiscovery()) {
return {};
}

TStringStream details;
TVector<NPDisk::TDriveData> drives = ListDevicesWithPartlabel(details);

Expand Down
25 changes: 24 additions & 1 deletion ydb/core/blobstorage/vdisk/repl/blobstorage_repl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,8 @@ namespace NKikimr {
TEvResumeForce *ResumeForceToken = nullptr;
TInstant ReplicationEndTime;
bool UnrecoveredNonphantomBlobs = false;
bool RequestedReplicationToken = false;
bool HoldingReplicationToken = false;

TWatchdogTimer<TEvReplCheckProgress> ReplProgressWatchdog;

Expand Down Expand Up @@ -287,6 +289,12 @@ namespace NKikimr {
case Plan:
// this is a first quantum of replication, so we have to register it in the broker
State = AwaitToken;
Y_DEBUG_ABORT_UNLESS(!RequestedReplicationToken);
if (RequestedReplicationToken) {
STLOG(PRI_CRIT, BS_REPL, BSVR38, ReplCtx->VCtx->VDiskLogPrefix << "excessive replication token requested");
break;
}
RequestedReplicationToken = true;
if (!Send(MakeBlobStorageReplBrokerID(), new TEvQueryReplToken(ReplCtx->VDiskCfg->BaseInfo.PDiskId))) {
HandleReplToken();
}
Expand All @@ -303,6 +311,10 @@ namespace NKikimr {
}

void HandleReplToken() {
Y_ABORT_UNLESS(RequestedReplicationToken);
RequestedReplicationToken = false;
HoldingReplicationToken = true;

// switch to replication state
Transition(AwaitToken, Replication);
if (!ResumeIfReady()) {
Expand Down Expand Up @@ -408,6 +420,9 @@ namespace NKikimr {
if (State == WaitQueues || State == Replication) {
// release token as we have finished replicating
Send(MakeBlobStorageReplBrokerID(), new TEvReleaseReplToken);
Y_DEBUG_ABORT_UNLESS(!RequestedReplicationToken);
Y_DEBUG_ABORT_UNLESS(HoldingReplicationToken);
HoldingReplicationToken = false;
}
ResetReplProgressTimer(true);

Expand Down Expand Up @@ -635,7 +650,15 @@ namespace NKikimr {

// return replication token if we have one
if (State == AwaitToken || State == WaitQueues || State == Replication) {
Send(MakeBlobStorageReplBrokerID(), new TEvReleaseReplToken);
Y_DEBUG_ABORT_UNLESS(RequestedReplicationToken || HoldingReplicationToken);
if (RequestedReplicationToken || HoldingReplicationToken) {
Send(MakeBlobStorageReplBrokerID(), new TEvReleaseReplToken);
}
} else {
Y_DEBUG_ABORT_UNLESS(!RequestedReplicationToken && !HoldingReplicationToken);
if (RequestedReplicationToken || HoldingReplicationToken) {
STLOG(PRI_CRIT, BS_REPL, BSVR37, ReplCtx->VCtx->VDiskLogPrefix << "stuck replication token");
}
}

if (ReplJobActorId) {
Expand Down
22 changes: 17 additions & 5 deletions ydb/core/blobstorage/vdisk/repl/blobstorage_replproxy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ namespace NKikimr {
ui64 NextReceiveCookie;
TResultQueue ResultQueue;
std::shared_ptr<TMessageRelevanceTracker> Tracker = std::make_shared<TMessageRelevanceTracker>();
bool Terminated = false;

TQueue<std::unique_ptr<TEvBlobStorage::TEvVGet>> SchedulerRequestQ;
THashMap<ui64, TReplMemTokenId> RequestTokens;
Expand Down Expand Up @@ -227,9 +228,7 @@ namespace NKikimr {
PrefetchDataSize = 0;
RequestFromVDiskProxyPending = false;
if (Finished) {
Send(MakeBlobStorageReplBrokerID(), new TEvPruneQueue);
RequestTokens.clear();
return PassAway(); // TODO(alexvru): check correctness of invocations
return PassAway();
}
}
// send request(s) if prefetch queue is not full
Expand Down Expand Up @@ -297,6 +296,9 @@ namespace NKikimr {
if (msg->Record.GetCookie() == NextReceiveCookie) {
ui64 cookie = NextReceiveCookie;
ProcessResult(msg);
if (Terminated) {
return;
}
ReleaseMemToken(cookie);
while (!ResultQueue.empty()) {
const TQueueItem& top = ResultQueue.top();
Expand All @@ -305,6 +307,9 @@ namespace NKikimr {
}
ui64 cookie = NextReceiveCookie;
ProcessResult(top.get());
if (Terminated) {
return;
}
ReleaseMemToken(cookie);
ResultQueue.pop();
}
Expand All @@ -314,6 +319,7 @@ namespace NKikimr {
}

void ReleaseMemToken(ui64 cookie) {
Y_ABORT_UNLESS(!Terminated);
if (RequestTokens) {
auto it = RequestTokens.find(cookie);
Y_ABORT_UNLESS(it != RequestTokens.end());
Expand Down Expand Up @@ -428,6 +434,13 @@ namespace NKikimr {
}
}

void PassAway() override {
Y_ABORT_UNLESS(!Terminated);
Terminated = true;
Send(MakeBlobStorageReplBrokerID(), new TEvPruneQueue);
TActorBootstrapped::PassAway();
}

STRICT_STFUNC(StateFunc,
hFunc(TEvReplProxyNext, Handle)
hFunc(TEvReplMemToken, Handle)
Expand All @@ -446,8 +459,7 @@ namespace NKikimr {
TTrackableVector<TVDiskProxy::TScheduledBlob>&& ids,
const TVDiskID& vdiskId,
const TActorId& serviceId)
: TActorBootstrapped<TVDiskProxyActor>()
, ReplCtx(std::move(replCtx))
: ReplCtx(std::move(replCtx))
, GType(ReplCtx->VCtx->Top->GType)
, Ids(std::move(ids))
, VDiskId(vdiskId)
Expand Down
3 changes: 2 additions & 1 deletion ydb/core/kqp/common/kqp_tx.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,8 @@ bool NeedSnapshot(const TKqpTransactionContext& txCtx, const NYql::TKikimrConfig
{
Y_UNUSED(config);

if (*txCtx.EffectiveIsolationLevel != NKikimrKqp::ISOLATION_LEVEL_SERIALIZABLE)
if (*txCtx.EffectiveIsolationLevel != NKikimrKqp::ISOLATION_LEVEL_SERIALIZABLE &&
*txCtx.EffectiveIsolationLevel != NKikimrKqp::ISOLATION_LEVEL_SNAPSHOT_RO)
return false;

if (txCtx.GetSnapshot().IsValid())
Expand Down
3 changes: 1 addition & 2 deletions ydb/core/kqp/common/kqp_tx.h
Original file line number Diff line number Diff line change
Expand Up @@ -212,8 +212,7 @@ class TKqpTransactionContext : public NYql::TKikimrTransactionContextBase {
break;

case Ydb::Table::TransactionSettings::kSnapshotReadOnly:
// TODO: (KIKIMR-3374) Use separate isolation mode to avoid optimistic locks.
EffectiveIsolationLevel = NKikimrKqp::ISOLATION_LEVEL_SERIALIZABLE;
EffectiveIsolationLevel = NKikimrKqp::ISOLATION_LEVEL_SNAPSHOT_RO;
Readonly = true;
break;

Expand Down
3 changes: 1 addition & 2 deletions ydb/core/kqp/executer_actor/kqp_executer_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -1039,8 +1039,7 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
std::sort(std::begin(shardsRanges), std::end(shardsRanges), [&](const TShardRangesWithShardId& lhs, const TShardRangesWithShardId& rhs) {
// Special case for infinity
if (lhs.Ranges->GetRightBorder().first->GetCells().empty() || rhs.Ranges->GetRightBorder().first->GetCells().empty()) {
YQL_ENSURE(!lhs.Ranges->GetRightBorder().first->GetCells().empty() || !rhs.Ranges->GetRightBorder().first->GetCells().empty());
return rhs.Ranges->GetRightBorder().first->GetCells().empty();
return !lhs.Ranges->GetRightBorder().first->GetCells().empty();
}
return CompareTypedCellVectors(
lhs.Ranges->GetRightBorder().first->GetCells().data(),
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/kqp/executer_actor/kqp_planner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -187,8 +187,8 @@ std::unique_ptr<TEvKqpNode::TEvStartKqpTasksRequest> TKqpPlanner::SerializeReque
request.SetTxId(TxId);
if (LockTxId) {
request.SetLockTxId(*LockTxId);
request.SetLockNodeId(LockNodeId);
}
request.SetLockNodeId(LockNodeId);
ActorIdToProto(ExecuterId, request.MutableExecuterActorId());

if (Deadline) {
Expand Down
33 changes: 27 additions & 6 deletions ydb/core/kqp/gateway/behaviour/view/manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ void FillCreateViewProposal(NKikimrSchemeOp::TModifyScheme& modifyScheme,
const auto pathPair = SplitPathByDb(settings.GetObjectId(), context.GetDatabase());
modifyScheme.SetWorkingDir(pathPair.first);
modifyScheme.SetOperationType(NKikimrSchemeOp::ESchemeOpCreateView);
modifyScheme.SetFailedOnAlreadyExists(!settings.GetExistingOk());

auto& viewDesc = *modifyScheme.MutableCreateView();
viewDesc.SetName(pathPair.second);
Expand All @@ -77,16 +78,20 @@ void FillDropViewProposal(NKikimrSchemeOp::TModifyScheme& modifyScheme,
const auto pathPair = SplitPathByObjectId(settings.GetObjectId());
modifyScheme.SetWorkingDir(pathPair.first);
modifyScheme.SetOperationType(NKikimrSchemeOp::ESchemeOpDropView);
modifyScheme.SetSuccessOnNotExist(settings.GetMissingOk());

auto& drop = *modifyScheme.MutableDrop();
drop.SetName(pathPair.second);
}

NThreading::TFuture<TYqlConclusionStatus> SendSchemeRequest(TEvTxUserProxy::TEvProposeTransaction* request,
TActorSystem* actorSystem,
bool failOnAlreadyExists) {
bool failedOnAlreadyExists,
bool successOnNotExist) {
const auto promiseScheme = NThreading::NewPromise<NKqp::TSchemeOpRequestHandler::TResult>();
IActor* const requestHandler = new TSchemeOpRequestHandler(request, promiseScheme, failOnAlreadyExists);
IActor* const requestHandler = new TSchemeOpRequestHandler(
request, promiseScheme, failedOnAlreadyExists, successOnNotExist
);
actorSystem->Register(requestHandler);
return promiseScheme.GetFuture().Apply([](const NThreading::TFuture<NKqp::TSchemeOpRequestHandler::TResult>& opResult) {
if (opResult.HasValue()) {
Expand All @@ -109,7 +114,12 @@ NThreading::TFuture<TYqlConclusionStatus> CreateView(const NYql::TCreateObjectSe
auto& schemeTx = *proposal->Record.MutableTransaction()->MutableModifyScheme();
FillCreateViewProposal(schemeTx, settings, context.GetExternalData());

return SendSchemeRequest(proposal.Release(), context.GetExternalData().GetActorSystem(), true);
return SendSchemeRequest(
proposal.Release(),
context.GetExternalData().GetActorSystem(),
schemeTx.GetFailedOnAlreadyExists(),
schemeTx.GetSuccessOnNotExist()
);
}

NThreading::TFuture<TYqlConclusionStatus> DropView(const NYql::TDropObjectSettings& settings,
Expand All @@ -122,7 +132,12 @@ NThreading::TFuture<TYqlConclusionStatus> DropView(const NYql::TDropObjectSettin
auto& schemeTx = *proposal->Record.MutableTransaction()->MutableModifyScheme();
FillDropViewProposal(schemeTx, settings);

return SendSchemeRequest(proposal.Release(), context.GetExternalData().GetActorSystem(), false);
return SendSchemeRequest(
proposal.Release(),
context.GetExternalData().GetActorSystem(),
schemeTx.GetFailedOnAlreadyExists(),
schemeTx.GetSuccessOnNotExist()
);
}

void PrepareCreateView(NKqpProto::TKqpSchemeOperation& schemeOperation,
Expand Down Expand Up @@ -214,10 +229,10 @@ NThreading::TFuture<TYqlConclusionStatus> TViewManager::ExecutePrepared(const NK
switch (schemeOperation.GetOperationCase()) {
case NKqpProto::TKqpSchemeOperation::kCreateView:
schemeTx.CopyFrom(schemeOperation.GetCreateView());
return SendSchemeRequest(proposal.Release(), context.GetActorSystem(), true);
break;
case NKqpProto::TKqpSchemeOperation::kDropView:
schemeTx.CopyFrom(schemeOperation.GetDropView());
return SendSchemeRequest(proposal.Release(), context.GetActorSystem(), false);
break;
default:
return NThreading::MakeFuture(TYqlConclusionStatus::Fail(
TStringBuilder()
Expand All @@ -226,6 +241,12 @@ NThreading::TFuture<TYqlConclusionStatus> TViewManager::ExecutePrepared(const NK
)
);
}
return SendSchemeRequest(
proposal.Release(),
context.GetActorSystem(),
schemeTx.GetFailedOnAlreadyExists(),
schemeTx.GetSuccessOnNotExist()
);
}

}
14 changes: 13 additions & 1 deletion ydb/core/kqp/opt/logical/kqp_opt_log_ranges_predext.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,18 @@ TMaybeNode<TCoLambda> ExtractTopSortKeySelector(TExprBase node, const NYql::TPar
return {};
}

bool IsIdLambda(TExprBase body) {
if (auto cond = body.Maybe<TCoConditionalValueBase>()) {
if (auto boolLit = cond.Cast().Predicate().Maybe<TCoBool>()) {
return boolLit.Literal().Cast().Value() == "true" && cond.Value().Maybe<TCoArgument>();
}
}
if (body.Maybe<TCoArgument>()) {
return true;
}
return false;
}

} // namespace

TExprBase KqpPushExtractedPredicateToReadTable(TExprBase node, TExprContext& ctx, const TKqpOptimizeContext& kqpCtx,
Expand Down Expand Up @@ -305,7 +317,7 @@ TExprBase KqpPushExtractedPredicateToReadTable(TExprBase node, TExprContext& ctx
const NYql::TKikimrTableDescription & tableDesc) -> TIndexComparisonKey
{
return std::make_tuple(
keySelector.IsValid() && IsSortKeyPrimary(keySelector.Cast(), tableDesc),
keySelector.IsValid() && IsSortKeyPrimary(keySelector.Cast(), tableDesc) && IsIdLambda(TCoLambda(buildResult.PrunedLambda).Body()),
buildResult.PointPrefixLen >= descriptionKeyColumns,
buildResult.PointPrefixLen >= descriptionKeyColumns ? 0 : buildResult.PointPrefixLen,
buildResult.UsedPrefixLen >= descriptionKeyColumns,
Expand Down
Loading

0 comments on commit 9e1fcd3

Please sign in to comment.