From 66acc0d1b11b2ca98c314b60cbb5582e7f97955b Mon Sep 17 00:00:00 2001 From: Dmitry Kardymon Date: Tue, 13 Feb 2024 14:32:29 +0000 Subject: [PATCH] use actor system --- .../libs/checkpoint_storage/storage_proxy.cpp | 90 +++++++++---------- .../ydb_checkpoint_storage.cpp | 6 +- 2 files changed, 48 insertions(+), 48 deletions(-) diff --git a/ydb/core/fq/libs/checkpoint_storage/storage_proxy.cpp b/ydb/core/fq/libs/checkpoint_storage/storage_proxy.cpp index 71eb8d760ef3..6200195af530 100644 --- a/ydb/core/fq/libs/checkpoint_storage/storage_proxy.cpp +++ b/ydb/core/fq/libs/checkpoint_storage/storage_proxy.cpp @@ -141,16 +141,16 @@ void TStorageProxy::Handle(TEvCheckpointStorage::TEvRegisterCoordinatorRequest:: .Apply([coordinatorId = event->CoordinatorId, cookie = ev->Cookie, sender = ev->Sender, - context = TActivationContext::AsActorContext()] (const NThreading::TFuture& issuesFuture) { + actorSystem = TActivationContext::ActorSystem()] (const NThreading::TFuture& issuesFuture) { auto response = std::make_unique(); response->Issues = issuesFuture.GetValue(); if (response->Issues) { - LOG_STREAMS_STORAGE_SERVICE_AS_WARN(context, "[" << coordinatorId << "] Failed to register graph: " << response->Issues.ToString()) + LOG_STREAMS_STORAGE_SERVICE_AS_WARN(*actorSystem, "[" << coordinatorId << "] Failed to register graph: " << response->Issues.ToString()) } else { - LOG_STREAMS_STORAGE_SERVICE_AS_INFO(context, "[" << coordinatorId << "] Graph registered") + LOG_STREAMS_STORAGE_SERVICE_AS_INFO(*actorSystem, "[" << coordinatorId << "] Graph registered") } - LOG_STREAMS_STORAGE_SERVICE_AS_DEBUG(context, "[" << coordinatorId << "] Send TEvRegisterCoordinatorResponse") - context.Send(sender, response.release(), 0, cookie); + LOG_STREAMS_STORAGE_SERVICE_AS_DEBUG(*actorSystem, "[" << coordinatorId << "] Send TEvRegisterCoordinatorResponse") + actorSystem->Send(sender, response.release(), 0, cookie); }); } @@ -164,14 +164,14 @@ void TStorageProxy::Handle(TEvCheckpointStorage::TEvCreateCheckpointRequest::TPt cookie = ev->Cookie, sender = ev->Sender, totalGraphCheckpointsSizeLimit = Config.GetStateStorageLimits().GetMaxGraphCheckpointsSizeBytes(), - context = TActivationContext::AsActorContext()] + actorSystem = TActivationContext::ActorSystem()] (const NThreading::TFuture& resultFuture) { auto result = resultFuture.GetValue(); auto issues = result.second; if (issues) { - LOG_STREAMS_STORAGE_SERVICE_AS_WARN(context, "[" << coordinatorId << "] [" << checkpointId << "] Failed to fetch total graph checkpoints size: " << issues.ToString()); - context.Send(sender, new TEvCheckpointStorage::TEvCreateCheckpointResponse(checkpointId, std::move(issues), TString()), 0, cookie); + LOG_STREAMS_STORAGE_SERVICE_AS_WARN(*actorSystem, "[" << coordinatorId << "] [" << checkpointId << "] Failed to fetch total graph checkpoints size: " << issues.ToString()); + actorSystem->Send(sender, new TEvCheckpointStorage::TEvCreateCheckpointResponse(checkpointId, std::move(issues), TString()), 0, cookie); return false; } @@ -181,10 +181,10 @@ void TStorageProxy::Handle(TEvCheckpointStorage::TEvCreateCheckpointRequest::TPt TStringStream ss; ss << "[" << coordinatorId << "] [" << checkpointId << "] Graph checkpoints size limit exceeded: limit " << totalGraphCheckpointsSizeLimit << ", current checkpoints size: " << totalGraphCheckpointsSize; auto message = ss.Str(); - LOG_STREAMS_STORAGE_SERVICE_AS_WARN(context, message) + LOG_STREAMS_STORAGE_SERVICE_AS_WARN(*actorSystem, message) issues.AddIssue(message); - LOG_STREAMS_STORAGE_SERVICE_AS_DEBUG(context, "[" << coordinatorId << "] [" << checkpointId << "] Send TEvCreateCheckpointResponse"); - context.Send(sender, new TEvCheckpointStorage::TEvCreateCheckpointResponse(checkpointId, std::move(issues), TString()), 0, cookie); + LOG_STREAMS_STORAGE_SERVICE_AS_DEBUG(*actorSystem, "[" << coordinatorId << "] [" << checkpointId << "] Send TEvCreateCheckpointResponse"); + actorSystem->Send(sender, new TEvCheckpointStorage::TEvCreateCheckpointResponse(checkpointId, std::move(issues), TString()), 0, cookie); return false; } return true; @@ -209,7 +209,7 @@ void TStorageProxy::Handle(TEvCheckpointStorage::TEvCreateCheckpointRequest::TPt coordinatorId = event->CoordinatorId, cookie = ev->Cookie, sender = ev->Sender, - context = TActivationContext::AsActorContext()] + actorSystem = TActivationContext::ActorSystem()] (const NThreading::TFuture& resultFuture) { if (!resultFuture.Initialized()) { // didn't pass the size limit check return; @@ -218,12 +218,12 @@ void TStorageProxy::Handle(TEvCheckpointStorage::TEvCreateCheckpointRequest::TPt auto issues = result.second; auto response = std::make_unique(checkpointId, std::move(issues), result.first); if (response->Issues) { - LOG_STREAMS_STORAGE_SERVICE_AS_WARN(context, "[" << coordinatorId << "] [" << checkpointId << "] Failed to create checkpoint: " << response->Issues.ToString()); + LOG_STREAMS_STORAGE_SERVICE_AS_WARN(*actorSystem, "[" << coordinatorId << "] [" << checkpointId << "] Failed to create checkpoint: " << response->Issues.ToString()); } else { - LOG_STREAMS_STORAGE_SERVICE_AS_INFO(context, "[" << coordinatorId << "] [" << checkpointId << "] Checkpoint created"); + LOG_STREAMS_STORAGE_SERVICE_AS_INFO(*actorSystem, "[" << coordinatorId << "] [" << checkpointId << "] Checkpoint created"); } - LOG_STREAMS_STORAGE_SERVICE_AS_DEBUG(context, "[" << coordinatorId << "] [" << checkpointId << "] Send TEvCreateCheckpointResponse"); - context.Send(sender, response.release(), 0, cookie); + LOG_STREAMS_STORAGE_SERVICE_AS_DEBUG(*actorSystem, "[" << coordinatorId << "] [" << checkpointId << "] Send TEvCreateCheckpointResponse"); + actorSystem->Send(sender, response.release(), 0, cookie); }); } @@ -235,17 +235,17 @@ void TStorageProxy::Handle(TEvCheckpointStorage::TEvSetCheckpointPendingCommitSt coordinatorId = event->CoordinatorId, cookie = ev->Cookie, sender = ev->Sender, - context = TActivationContext::AsActorContext()] + actorSystem = TActivationContext::ActorSystem()] (const NThreading::TFuture& issuesFuture) { auto issues = issuesFuture.GetValue(); auto response = std::make_unique(checkpointId, std::move(issues)); if (response->Issues) { - LOG_STREAMS_STORAGE_SERVICE_AS_WARN(context, "[" << coordinatorId << "] [" << checkpointId << "] Failed to set 'PendingCommit' status: " << response->Issues.ToString()) + LOG_STREAMS_STORAGE_SERVICE_AS_WARN(*actorSystem, "[" << coordinatorId << "] [" << checkpointId << "] Failed to set 'PendingCommit' status: " << response->Issues.ToString()) } else { - LOG_STREAMS_STORAGE_SERVICE_AS_INFO(context, "[" << coordinatorId << "] [" << checkpointId << "] Status updated to 'PendingCommit'") + LOG_STREAMS_STORAGE_SERVICE_AS_INFO(*actorSystem, "[" << coordinatorId << "] [" << checkpointId << "] Status updated to 'PendingCommit'") } - LOG_STREAMS_STORAGE_SERVICE_AS_DEBUG(context, "[" << coordinatorId << "] [" << checkpointId << "] Send TEvSetCheckpointPendingCommitStatusResponse") - context.Send(sender, response.release(), 0, cookie); + LOG_STREAMS_STORAGE_SERVICE_AS_DEBUG(*actorSystem, "[" << coordinatorId << "] [" << checkpointId << "] Send TEvSetCheckpointPendingCommitStatusResponse") + actorSystem->Send(sender, response.release(), 0, cookie); }); } @@ -259,22 +259,22 @@ void TStorageProxy::Handle(TEvCheckpointStorage::TEvCompleteCheckpointRequest::T sender = ev->Sender, gcEnabled = Config.GetCheckpointGarbageConfig().GetEnabled(), actorGC = ActorGC, - context = TActivationContext::AsActorContext()] + actorSystem = TActivationContext::ActorSystem()] (const NThreading::TFuture& issuesFuture) { auto issues = issuesFuture.GetValue(); auto response = std::make_unique(checkpointId, std::move(issues)); if (response->Issues) { - LOG_STREAMS_STORAGE_SERVICE_AS_DEBUG(context, "[" << coordinatorId << "] [" << checkpointId << "] Failed to set 'Completed' status: " << response->Issues.ToString()) + LOG_STREAMS_STORAGE_SERVICE_AS_DEBUG(*actorSystem, "[" << coordinatorId << "] [" << checkpointId << "] Failed to set 'Completed' status: " << response->Issues.ToString()) } else { - LOG_STREAMS_STORAGE_SERVICE_AS_INFO(context, "[" << coordinatorId << "] [" << checkpointId << "] Status updated to 'Completed'") + LOG_STREAMS_STORAGE_SERVICE_AS_INFO(*actorSystem, "[" << coordinatorId << "] [" << checkpointId << "] Status updated to 'Completed'") if (gcEnabled) { auto request = std::make_unique(coordinatorId, checkpointId); - LOG_STREAMS_STORAGE_SERVICE_AS_DEBUG(context, "[" << coordinatorId << "] [" << checkpointId << "] Send TEvNewCheckpointSucceeded") - context.Send(actorGC, request.release(), 0); + LOG_STREAMS_STORAGE_SERVICE_AS_DEBUG(*actorSystem, "[" << coordinatorId << "] [" << checkpointId << "] Send TEvNewCheckpointSucceeded") + actorSystem->Send(actorGC, request.release(), 0); } } - LOG_STREAMS_STORAGE_SERVICE_AS_DEBUG(context, "[" << coordinatorId << "] [" << checkpointId << "] Send TEvCompleteCheckpointResponse") - context.Send(sender, response.release(), 0, cookie); + LOG_STREAMS_STORAGE_SERVICE_AS_DEBUG(*actorSystem, "[" << coordinatorId << "] [" << checkpointId << "] Send TEvCompleteCheckpointResponse") + actorSystem->Send(sender, response.release(), 0, cookie); }); } @@ -286,16 +286,16 @@ void TStorageProxy::Handle(TEvCheckpointStorage::TEvAbortCheckpointRequest::TPtr coordinatorId = event->CoordinatorId, cookie = ev->Cookie, sender = ev->Sender, - context = TActivationContext::AsActorContext()] (const NThreading::TFuture& issuesFuture) { + actorSystem = TActivationContext::ActorSystem()] (const NThreading::TFuture& issuesFuture) { auto issues = issuesFuture.GetValue(); auto response = std::make_unique(checkpointId, std::move(issues)); if (response->Issues) { - LOG_STREAMS_STORAGE_SERVICE_AS_WARN(context, "[" << coordinatorId << "] [" << checkpointId << "] Failed to abort checkpoint: " << response->Issues.ToString()) + LOG_STREAMS_STORAGE_SERVICE_AS_WARN(*actorSystem, "[" << coordinatorId << "] [" << checkpointId << "] Failed to abort checkpoint: " << response->Issues.ToString()) } else { - LOG_STREAMS_STORAGE_SERVICE_AS_INFO(context, "[" << coordinatorId << "] [" << checkpointId << "] Checkpoint aborted") + LOG_STREAMS_STORAGE_SERVICE_AS_INFO(*actorSystem, "[" << coordinatorId << "] [" << checkpointId << "] Checkpoint aborted") } - LOG_STREAMS_STORAGE_SERVICE_AS_DEBUG(context, "[" << coordinatorId << "] [" << checkpointId << "] Send TEvAbortCheckpointResponse") - context.Send(sender, response.release(), 0, cookie); + LOG_STREAMS_STORAGE_SERVICE_AS_DEBUG(*actorSystem, "[" << coordinatorId << "] [" << checkpointId << "] Send TEvAbortCheckpointResponse") + actorSystem->Send(sender, response.release(), 0, cookie); }); } @@ -306,14 +306,14 @@ void TStorageProxy::Handle(TEvCheckpointStorage::TEvGetCheckpointsMetadataReques .Apply([graphId = event->GraphId, cookie = ev->Cookie, sender = ev->Sender, - context = TActivationContext::AsActorContext()] (const NThreading::TFuture& futureResult) { + actorSystem = TActivationContext::ActorSystem()] (const NThreading::TFuture& futureResult) { auto result = futureResult.GetValue(); auto response = std::make_unique(result.first, result.second); if (response->Issues) { - LOG_STREAMS_STORAGE_SERVICE_AS_WARN(context, "[" << graphId << "] Failed to get checkpoints: " << response->Issues.ToString()) + LOG_STREAMS_STORAGE_SERVICE_AS_WARN(*actorSystem, "[" << graphId << "] Failed to get checkpoints: " << response->Issues.ToString()) } - LOG_STREAMS_STORAGE_SERVICE_AS_DEBUG(context, "[" << graphId << "] Send TEvGetCheckpointsMetadataResponse") - context.Send(sender, response.release(), 0, cookie); + LOG_STREAMS_STORAGE_SERVICE_AS_DEBUG(*actorSystem, "[" << graphId << "] Send TEvGetCheckpointsMetadataResponse") + actorSystem->Send(sender, response.release(), 0, cookie); }); } @@ -343,7 +343,7 @@ void TStorageProxy::Handle(NYql::NDq::TEvDqCompute::TEvSaveTaskState::TPtr& ev) cookie = ev->Cookie, sender = ev->Sender, stateSize = stateSize, - context = TActivationContext::AsActorContext()](const NThreading::TFuture& futureResult) { + actorSystem = TActivationContext::ActorSystem()](const NThreading::TFuture& futureResult) { const auto& issues = futureResult.GetValue(); auto response = std::make_unique(); response->Record.MutableCheckpoint()->SetGeneration(checkpointId.CoordinatorGeneration); @@ -352,13 +352,13 @@ void TStorageProxy::Handle(NYql::NDq::TEvDqCompute::TEvSaveTaskState::TPtr& ev) response->Record.SetTaskId(taskId); if (issues) { - LOG_STREAMS_STORAGE_SERVICE_AS_WARN(context, "[" << checkpointId << "] Failed to save task state: task: " << taskId << ", issues: " << issues.ToString()) + LOG_STREAMS_STORAGE_SERVICE_AS_WARN(*actorSystem, "[" << checkpointId << "] Failed to save task state: task: " << taskId << ", issues: " << issues.ToString()) response->Record.SetStatus(NYql::NDqProto::TEvSaveTaskStateResult::STORAGE_ERROR); } else { response->Record.SetStatus(NYql::NDqProto::TEvSaveTaskStateResult::OK); } - LOG_STREAMS_STORAGE_SERVICE_AS_DEBUG(context, "[" << checkpointId << "] Send TEvSaveTaskStateResult") - context.Send(sender, response.release(), 0, cookie); + LOG_STREAMS_STORAGE_SERVICE_AS_DEBUG(*actorSystem, "[" << checkpointId << "] Send TEvSaveTaskStateResult") + actorSystem->Send(sender, response.release(), 0, cookie); }); } @@ -373,16 +373,16 @@ void TStorageProxy::Handle(NYql::NDq::TEvDqCompute::TEvGetTaskState::TPtr& ev) { taskIds = event->TaskIds, cookie = ev->Cookie, sender = ev->Sender, - context = TActivationContext::AsActorContext()](const NThreading::TFuture& resultFuture) { + actorSystem = TActivationContext::ActorSystem()](const NThreading::TFuture& resultFuture) { auto result = resultFuture.GetValue(); auto response = std::make_unique(checkpointId, result.second, generation); std::swap(response->States, result.first); if (response->Issues) { - LOG_STREAMS_STORAGE_SERVICE_AS_WARN(context, "[" << checkpointId << "] Failed to get task state: taskIds: {" << JoinSeq(", ", taskIds) << "}, issues: " << response->Issues.ToString()); + LOG_STREAMS_STORAGE_SERVICE_AS_WARN(*actorSystem, "[" << checkpointId << "] Failed to get task state: taskIds: {" << JoinSeq(", ", taskIds) << "}, issues: " << response->Issues.ToString()); } - LOG_STREAMS_STORAGE_SERVICE_AS_DEBUG(context, "[" << checkpointId << "] Send TEvGetTaskStateResult"); - context.Send(sender, response.release(), 0, cookie); + LOG_STREAMS_STORAGE_SERVICE_AS_DEBUG(*actorSystem, "[" << checkpointId << "] Send TEvGetTaskStateResult"); + actorSystem->Send(sender, response.release(), 0, cookie); }); } diff --git a/ydb/core/fq/libs/checkpoint_storage/ydb_checkpoint_storage.cpp b/ydb/core/fq/libs/checkpoint_storage/ydb_checkpoint_storage.cpp index 3d71e8809ac1..c270f8e7201f 100644 --- a/ydb/core/fq/libs/checkpoint_storage/ydb_checkpoint_storage.cpp +++ b/ydb/core/fq/libs/checkpoint_storage/ydb_checkpoint_storage.cpp @@ -1056,7 +1056,7 @@ TFuture TCheckpointStor auto result = MakeIntrusive(); auto future = YdbConnection->TableClient.RetryOperation( [prefix = YdbConnection->TablePathPrefix, graphId, thisPtr = TIntrusivePtr(this), result, - context = NActors::TActivationContext::AsActorContext()](TSession session) { + actorSystem = NActors::TActivationContext::ActorSystem()](TSession session) { NYdb::TParamsBuilder paramsBuilder; paramsBuilder.AddParam("$graph_id").String(graphId).Build(); auto params = paramsBuilder.Build(); @@ -1078,12 +1078,12 @@ TFuture TCheckpointStor params, thisPtr->DefaultExecDataQuerySettings()) .Apply( - [graphId, result, context](const TFuture& future) { + [graphId, result, actorSystem](const TFuture& future) { const auto& queryResult = future.GetValue(); auto status = TStatus(queryResult); if (!queryResult.IsSuccess()) { - LOG_STREAMS_STORAGE_SERVICE_AS_ERROR(context, TStringBuilder() << "GetTotalCheckpointsStateSize: can't get total graph's checkpoints size [" << graphId << "] " << queryResult.GetIssues().ToString()); return status; + LOG_STREAMS_STORAGE_SERVICE_AS_ERROR(*actorSystem, TStringBuilder() << "GetTotalCheckpointsStateSize: can't get total graph's checkpoints size [" << graphId << "] " << queryResult.GetIssues().ToString()); return status; } TResultSetParser parser = queryResult.GetResultSetParser(0);