Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

YQ-2704 Use ActorSystem() instead of AsActorContext() #1892

Merged
merged 1 commit into from
Feb 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 45 additions & 45 deletions ydb/core/fq/libs/checkpoint_storage/storage_proxy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -141,16 +141,16 @@ void TStorageProxy::Handle(TEvCheckpointStorage::TEvRegisterCoordinatorRequest::
.Apply([coordinatorId = event->CoordinatorId,
cookie = ev->Cookie,
sender = ev->Sender,
context = TActivationContext::AsActorContext()] (const NThreading::TFuture<NYql::TIssues>& issuesFuture) {
actorSystem = TActivationContext::ActorSystem()] (const NThreading::TFuture<NYql::TIssues>& issuesFuture) {
auto response = std::make_unique<TEvCheckpointStorage::TEvRegisterCoordinatorResponse>();
response->Issues = issuesFuture.GetValue();
if (response->Issues) {
LOG_STREAMS_STORAGE_SERVICE_AS_WARN(context, "[" << coordinatorId << "] Failed to register graph: " << response->Issues.ToString())
LOG_STREAMS_STORAGE_SERVICE_AS_WARN(*actorSystem, "[" << coordinatorId << "] Failed to register graph: " << response->Issues.ToString())
} else {
LOG_STREAMS_STORAGE_SERVICE_AS_INFO(context, "[" << coordinatorId << "] Graph registered")
LOG_STREAMS_STORAGE_SERVICE_AS_INFO(*actorSystem, "[" << coordinatorId << "] Graph registered")
}
LOG_STREAMS_STORAGE_SERVICE_AS_DEBUG(context, "[" << coordinatorId << "] Send TEvRegisterCoordinatorResponse")
context.Send(sender, response.release(), 0, cookie);
LOG_STREAMS_STORAGE_SERVICE_AS_DEBUG(*actorSystem, "[" << coordinatorId << "] Send TEvRegisterCoordinatorResponse")
actorSystem->Send(sender, response.release(), 0, cookie);
});
}

Expand All @@ -164,14 +164,14 @@ void TStorageProxy::Handle(TEvCheckpointStorage::TEvCreateCheckpointRequest::TPt
cookie = ev->Cookie,
sender = ev->Sender,
totalGraphCheckpointsSizeLimit = Config.GetStateStorageLimits().GetMaxGraphCheckpointsSizeBytes(),
context = TActivationContext::AsActorContext()]
actorSystem = TActivationContext::ActorSystem()]
(const NThreading::TFuture<ICheckpointStorage::TGetTotalCheckpointsStateSizeResult>& resultFuture) {
auto result = resultFuture.GetValue();
auto issues = result.second;

if (issues) {
LOG_STREAMS_STORAGE_SERVICE_AS_WARN(context, "[" << coordinatorId << "] [" << checkpointId << "] Failed to fetch total graph checkpoints size: " << issues.ToString());
context.Send(sender, new TEvCheckpointStorage::TEvCreateCheckpointResponse(checkpointId, std::move(issues), TString()), 0, cookie);
LOG_STREAMS_STORAGE_SERVICE_AS_WARN(*actorSystem, "[" << coordinatorId << "] [" << checkpointId << "] Failed to fetch total graph checkpoints size: " << issues.ToString());
actorSystem->Send(sender, new TEvCheckpointStorage::TEvCreateCheckpointResponse(checkpointId, std::move(issues), TString()), 0, cookie);
return false;
}

Expand All @@ -181,10 +181,10 @@ void TStorageProxy::Handle(TEvCheckpointStorage::TEvCreateCheckpointRequest::TPt
TStringStream ss;
ss << "[" << coordinatorId << "] [" << checkpointId << "] Graph checkpoints size limit exceeded: limit " << totalGraphCheckpointsSizeLimit << ", current checkpoints size: " << totalGraphCheckpointsSize;
auto message = ss.Str();
LOG_STREAMS_STORAGE_SERVICE_AS_WARN(context, message)
LOG_STREAMS_STORAGE_SERVICE_AS_WARN(*actorSystem, message)
issues.AddIssue(message);
LOG_STREAMS_STORAGE_SERVICE_AS_DEBUG(context, "[" << coordinatorId << "] [" << checkpointId << "] Send TEvCreateCheckpointResponse");
context.Send(sender, new TEvCheckpointStorage::TEvCreateCheckpointResponse(checkpointId, std::move(issues), TString()), 0, cookie);
LOG_STREAMS_STORAGE_SERVICE_AS_DEBUG(*actorSystem, "[" << coordinatorId << "] [" << checkpointId << "] Send TEvCreateCheckpointResponse");
actorSystem->Send(sender, new TEvCheckpointStorage::TEvCreateCheckpointResponse(checkpointId, std::move(issues), TString()), 0, cookie);
return false;
}
return true;
Expand All @@ -209,7 +209,7 @@ void TStorageProxy::Handle(TEvCheckpointStorage::TEvCreateCheckpointRequest::TPt
coordinatorId = event->CoordinatorId,
cookie = ev->Cookie,
sender = ev->Sender,
context = TActivationContext::AsActorContext()]
actorSystem = TActivationContext::ActorSystem()]
(const NThreading::TFuture<ICheckpointStorage::TCreateCheckpointResult>& resultFuture) {
if (!resultFuture.Initialized()) { // didn't pass the size limit check
return;
Expand All @@ -218,12 +218,12 @@ void TStorageProxy::Handle(TEvCheckpointStorage::TEvCreateCheckpointRequest::TPt
auto issues = result.second;
auto response = std::make_unique<TEvCheckpointStorage::TEvCreateCheckpointResponse>(checkpointId, std::move(issues), result.first);
if (response->Issues) {
LOG_STREAMS_STORAGE_SERVICE_AS_WARN(context, "[" << coordinatorId << "] [" << checkpointId << "] Failed to create checkpoint: " << response->Issues.ToString());
LOG_STREAMS_STORAGE_SERVICE_AS_WARN(*actorSystem, "[" << coordinatorId << "] [" << checkpointId << "] Failed to create checkpoint: " << response->Issues.ToString());
} else {
LOG_STREAMS_STORAGE_SERVICE_AS_INFO(context, "[" << coordinatorId << "] [" << checkpointId << "] Checkpoint created");
LOG_STREAMS_STORAGE_SERVICE_AS_INFO(*actorSystem, "[" << coordinatorId << "] [" << checkpointId << "] Checkpoint created");
}
LOG_STREAMS_STORAGE_SERVICE_AS_DEBUG(context, "[" << coordinatorId << "] [" << checkpointId << "] Send TEvCreateCheckpointResponse");
context.Send(sender, response.release(), 0, cookie);
LOG_STREAMS_STORAGE_SERVICE_AS_DEBUG(*actorSystem, "[" << coordinatorId << "] [" << checkpointId << "] Send TEvCreateCheckpointResponse");
actorSystem->Send(sender, response.release(), 0, cookie);
});
}

Expand All @@ -235,17 +235,17 @@ void TStorageProxy::Handle(TEvCheckpointStorage::TEvSetCheckpointPendingCommitSt
coordinatorId = event->CoordinatorId,
cookie = ev->Cookie,
sender = ev->Sender,
context = TActivationContext::AsActorContext()]
actorSystem = TActivationContext::ActorSystem()]
(const NThreading::TFuture<NYql::TIssues>& issuesFuture) {
auto issues = issuesFuture.GetValue();
auto response = std::make_unique<TEvCheckpointStorage::TEvSetCheckpointPendingCommitStatusResponse>(checkpointId, std::move(issues));
if (response->Issues) {
LOG_STREAMS_STORAGE_SERVICE_AS_WARN(context, "[" << coordinatorId << "] [" << checkpointId << "] Failed to set 'PendingCommit' status: " << response->Issues.ToString())
LOG_STREAMS_STORAGE_SERVICE_AS_WARN(*actorSystem, "[" << coordinatorId << "] [" << checkpointId << "] Failed to set 'PendingCommit' status: " << response->Issues.ToString())
} else {
LOG_STREAMS_STORAGE_SERVICE_AS_INFO(context, "[" << coordinatorId << "] [" << checkpointId << "] Status updated to 'PendingCommit'")
LOG_STREAMS_STORAGE_SERVICE_AS_INFO(*actorSystem, "[" << coordinatorId << "] [" << checkpointId << "] Status updated to 'PendingCommit'")
}
LOG_STREAMS_STORAGE_SERVICE_AS_DEBUG(context, "[" << coordinatorId << "] [" << checkpointId << "] Send TEvSetCheckpointPendingCommitStatusResponse")
context.Send(sender, response.release(), 0, cookie);
LOG_STREAMS_STORAGE_SERVICE_AS_DEBUG(*actorSystem, "[" << coordinatorId << "] [" << checkpointId << "] Send TEvSetCheckpointPendingCommitStatusResponse")
actorSystem->Send(sender, response.release(), 0, cookie);
});
}

Expand All @@ -259,22 +259,22 @@ void TStorageProxy::Handle(TEvCheckpointStorage::TEvCompleteCheckpointRequest::T
sender = ev->Sender,
gcEnabled = Config.GetCheckpointGarbageConfig().GetEnabled(),
actorGC = ActorGC,
context = TActivationContext::AsActorContext()]
actorSystem = TActivationContext::ActorSystem()]
(const NThreading::TFuture<NYql::TIssues>& issuesFuture) {
auto issues = issuesFuture.GetValue();
auto response = std::make_unique<TEvCheckpointStorage::TEvCompleteCheckpointResponse>(checkpointId, std::move(issues));
if (response->Issues) {
LOG_STREAMS_STORAGE_SERVICE_AS_DEBUG(context, "[" << coordinatorId << "] [" << checkpointId << "] Failed to set 'Completed' status: " << response->Issues.ToString())
LOG_STREAMS_STORAGE_SERVICE_AS_DEBUG(*actorSystem, "[" << coordinatorId << "] [" << checkpointId << "] Failed to set 'Completed' status: " << response->Issues.ToString())
} else {
LOG_STREAMS_STORAGE_SERVICE_AS_INFO(context, "[" << coordinatorId << "] [" << checkpointId << "] Status updated to 'Completed'")
LOG_STREAMS_STORAGE_SERVICE_AS_INFO(*actorSystem, "[" << coordinatorId << "] [" << checkpointId << "] Status updated to 'Completed'")
if (gcEnabled) {
auto request = std::make_unique<TEvCheckpointStorage::TEvNewCheckpointSucceeded>(coordinatorId, checkpointId);
LOG_STREAMS_STORAGE_SERVICE_AS_DEBUG(context, "[" << coordinatorId << "] [" << checkpointId << "] Send TEvNewCheckpointSucceeded")
context.Send(actorGC, request.release(), 0);
LOG_STREAMS_STORAGE_SERVICE_AS_DEBUG(*actorSystem, "[" << coordinatorId << "] [" << checkpointId << "] Send TEvNewCheckpointSucceeded")
actorSystem->Send(actorGC, request.release(), 0);
}
}
LOG_STREAMS_STORAGE_SERVICE_AS_DEBUG(context, "[" << coordinatorId << "] [" << checkpointId << "] Send TEvCompleteCheckpointResponse")
context.Send(sender, response.release(), 0, cookie);
LOG_STREAMS_STORAGE_SERVICE_AS_DEBUG(*actorSystem, "[" << coordinatorId << "] [" << checkpointId << "] Send TEvCompleteCheckpointResponse")
actorSystem->Send(sender, response.release(), 0, cookie);
});
}

Expand All @@ -286,16 +286,16 @@ void TStorageProxy::Handle(TEvCheckpointStorage::TEvAbortCheckpointRequest::TPtr
coordinatorId = event->CoordinatorId,
cookie = ev->Cookie,
sender = ev->Sender,
context = TActivationContext::AsActorContext()] (const NThreading::TFuture<NYql::TIssues>& issuesFuture) {
actorSystem = TActivationContext::ActorSystem()] (const NThreading::TFuture<NYql::TIssues>& issuesFuture) {
auto issues = issuesFuture.GetValue();
auto response = std::make_unique<TEvCheckpointStorage::TEvAbortCheckpointResponse>(checkpointId, std::move(issues));
if (response->Issues) {
LOG_STREAMS_STORAGE_SERVICE_AS_WARN(context, "[" << coordinatorId << "] [" << checkpointId << "] Failed to abort checkpoint: " << response->Issues.ToString())
LOG_STREAMS_STORAGE_SERVICE_AS_WARN(*actorSystem, "[" << coordinatorId << "] [" << checkpointId << "] Failed to abort checkpoint: " << response->Issues.ToString())
} else {
LOG_STREAMS_STORAGE_SERVICE_AS_INFO(context, "[" << coordinatorId << "] [" << checkpointId << "] Checkpoint aborted")
LOG_STREAMS_STORAGE_SERVICE_AS_INFO(*actorSystem, "[" << coordinatorId << "] [" << checkpointId << "] Checkpoint aborted")
}
LOG_STREAMS_STORAGE_SERVICE_AS_DEBUG(context, "[" << coordinatorId << "] [" << checkpointId << "] Send TEvAbortCheckpointResponse")
context.Send(sender, response.release(), 0, cookie);
LOG_STREAMS_STORAGE_SERVICE_AS_DEBUG(*actorSystem, "[" << coordinatorId << "] [" << checkpointId << "] Send TEvAbortCheckpointResponse")
actorSystem->Send(sender, response.release(), 0, cookie);
});
}

Expand All @@ -306,14 +306,14 @@ void TStorageProxy::Handle(TEvCheckpointStorage::TEvGetCheckpointsMetadataReques
.Apply([graphId = event->GraphId,
cookie = ev->Cookie,
sender = ev->Sender,
context = TActivationContext::AsActorContext()] (const NThreading::TFuture<ICheckpointStorage::TGetCheckpointsResult>& futureResult) {
actorSystem = TActivationContext::ActorSystem()] (const NThreading::TFuture<ICheckpointStorage::TGetCheckpointsResult>& futureResult) {
auto result = futureResult.GetValue();
auto response = std::make_unique<TEvCheckpointStorage::TEvGetCheckpointsMetadataResponse>(result.first, result.second);
if (response->Issues) {
LOG_STREAMS_STORAGE_SERVICE_AS_WARN(context, "[" << graphId << "] Failed to get checkpoints: " << response->Issues.ToString())
LOG_STREAMS_STORAGE_SERVICE_AS_WARN(*actorSystem, "[" << graphId << "] Failed to get checkpoints: " << response->Issues.ToString())
}
LOG_STREAMS_STORAGE_SERVICE_AS_DEBUG(context, "[" << graphId << "] Send TEvGetCheckpointsMetadataResponse")
context.Send(sender, response.release(), 0, cookie);
LOG_STREAMS_STORAGE_SERVICE_AS_DEBUG(*actorSystem, "[" << graphId << "] Send TEvGetCheckpointsMetadataResponse")
actorSystem->Send(sender, response.release(), 0, cookie);
});
}

Expand Down Expand Up @@ -343,7 +343,7 @@ void TStorageProxy::Handle(NYql::NDq::TEvDqCompute::TEvSaveTaskState::TPtr& ev)
cookie = ev->Cookie,
sender = ev->Sender,
stateSize = stateSize,
context = TActivationContext::AsActorContext()](const NThreading::TFuture<NYql::TIssues>& futureResult) {
actorSystem = TActivationContext::ActorSystem()](const NThreading::TFuture<NYql::TIssues>& futureResult) {
const auto& issues = futureResult.GetValue();
auto response = std::make_unique<NYql::NDq::TEvDqCompute::TEvSaveTaskStateResult>();
response->Record.MutableCheckpoint()->SetGeneration(checkpointId.CoordinatorGeneration);
Expand All @@ -352,13 +352,13 @@ void TStorageProxy::Handle(NYql::NDq::TEvDqCompute::TEvSaveTaskState::TPtr& ev)
response->Record.SetTaskId(taskId);

if (issues) {
LOG_STREAMS_STORAGE_SERVICE_AS_WARN(context, "[" << checkpointId << "] Failed to save task state: task: " << taskId << ", issues: " << issues.ToString())
LOG_STREAMS_STORAGE_SERVICE_AS_WARN(*actorSystem, "[" << checkpointId << "] Failed to save task state: task: " << taskId << ", issues: " << issues.ToString())
response->Record.SetStatus(NYql::NDqProto::TEvSaveTaskStateResult::STORAGE_ERROR);
} else {
response->Record.SetStatus(NYql::NDqProto::TEvSaveTaskStateResult::OK);
}
LOG_STREAMS_STORAGE_SERVICE_AS_DEBUG(context, "[" << checkpointId << "] Send TEvSaveTaskStateResult")
context.Send(sender, response.release(), 0, cookie);
LOG_STREAMS_STORAGE_SERVICE_AS_DEBUG(*actorSystem, "[" << checkpointId << "] Send TEvSaveTaskStateResult")
actorSystem->Send(sender, response.release(), 0, cookie);
});
}

Expand All @@ -373,16 +373,16 @@ void TStorageProxy::Handle(NYql::NDq::TEvDqCompute::TEvGetTaskState::TPtr& ev) {
taskIds = event->TaskIds,
cookie = ev->Cookie,
sender = ev->Sender,
context = TActivationContext::AsActorContext()](const NThreading::TFuture<IStateStorage::TGetStateResult>& resultFuture) {
actorSystem = TActivationContext::ActorSystem()](const NThreading::TFuture<IStateStorage::TGetStateResult>& resultFuture) {
auto result = resultFuture.GetValue();

auto response = std::make_unique<NYql::NDq::TEvDqCompute::TEvGetTaskStateResult>(checkpointId, result.second, generation);
std::swap(response->States, result.first);
if (response->Issues) {
LOG_STREAMS_STORAGE_SERVICE_AS_WARN(context, "[" << checkpointId << "] Failed to get task state: taskIds: {" << JoinSeq(", ", taskIds) << "}, issues: " << response->Issues.ToString());
LOG_STREAMS_STORAGE_SERVICE_AS_WARN(*actorSystem, "[" << checkpointId << "] Failed to get task state: taskIds: {" << JoinSeq(", ", taskIds) << "}, issues: " << response->Issues.ToString());
}
LOG_STREAMS_STORAGE_SERVICE_AS_DEBUG(context, "[" << checkpointId << "] Send TEvGetTaskStateResult");
context.Send(sender, response.release(), 0, cookie);
LOG_STREAMS_STORAGE_SERVICE_AS_DEBUG(*actorSystem, "[" << checkpointId << "] Send TEvGetTaskStateResult");
actorSystem->Send(sender, response.release(), 0, cookie);
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1056,7 +1056,7 @@ TFuture<ICheckpointStorage::TGetTotalCheckpointsStateSizeResult> TCheckpointStor
auto result = MakeIntrusive<TGetTotalCheckpointsStateSizeContext>();
auto future = YdbConnection->TableClient.RetryOperation(
[prefix = YdbConnection->TablePathPrefix, graphId, thisPtr = TIntrusivePtr(this), result,
context = NActors::TActivationContext::AsActorContext()](TSession session) {
actorSystem = NActors::TActivationContext::ActorSystem()](TSession session) {
NYdb::TParamsBuilder paramsBuilder;
paramsBuilder.AddParam("$graph_id").String(graphId).Build();
auto params = paramsBuilder.Build();
Expand All @@ -1078,12 +1078,12 @@ TFuture<ICheckpointStorage::TGetTotalCheckpointsStateSizeResult> TCheckpointStor
params,
thisPtr->DefaultExecDataQuerySettings())
.Apply(
[graphId, result, context](const TFuture<TDataQueryResult>& future) {
[graphId, result, actorSystem](const TFuture<TDataQueryResult>& future) {
const auto& queryResult = future.GetValue();
auto status = TStatus(queryResult);

if (!queryResult.IsSuccess()) {
LOG_STREAMS_STORAGE_SERVICE_AS_ERROR(context, TStringBuilder() << "GetTotalCheckpointsStateSize: can't get total graph's checkpoints size [" << graphId << "] " << queryResult.GetIssues().ToString()); return status;
LOG_STREAMS_STORAGE_SERVICE_AS_ERROR(*actorSystem, TStringBuilder() << "GetTotalCheckpointsStateSize: can't get total graph's checkpoints size [" << graphId << "] " << queryResult.GetIssues().ToString()); return status;
}

TResultSetParser parser = queryResult.GetResultSetParser(0);
Expand Down
Loading