Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert "[AN-146] Emit VM cost for GCP Batch" #7669

Merged
merged 3 commits into from
Dec 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,6 @@ object Dependencies {
private val metrics3StatsdV = "4.2.0"
private val mockFtpServerV = "3.0.0"
private val mockitoV = "3.12.4"
private val mockitoInlineV = "2.8.9"
private val mockserverNettyV = "5.14.0"
private val mouseV = "1.0.11"

Expand Down Expand Up @@ -626,8 +625,7 @@ object Dependencies {
"org.scalatest" %% "scalatest" % scalatestV,
// Use mockito Java DSL directly instead of the numerous and often hard to keep updated Scala DSLs.
// See also scaladoc in common.mock.MockSugar and that trait's various usages.
"org.mockito" % "mockito-core" % mockitoV,
"org.mockito" % "mockito-inline" % mockitoInlineV
"org.mockito" % "mockito-core" % mockitoV
) ++ slf4jBindingDependencies // During testing, add an slf4j binding for _all_ libraries.

val kindProjectorPlugin = "org.typelevel" % "kind-projector" % kindProjectorV cross CrossVersion.full
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,6 @@
case event if event.name == CallMetadataKeys.VmEndTime => event.offsetDateTime
}

override def extractVmInfoFromRunState(pollStatus: RunStatus): Option[InstantiatedVmInfo] =
pollStatus.instantiatedVmInfo

override def handleVmCostLookup(vmInfo: InstantiatedVmInfo) = {
val request = GcpCostLookupRequest(vmInfo, self)
params.serviceRegistry ! request
Expand All @@ -72,7 +69,6 @@
}

override def receive: Receive = {
case costResponse: GcpCostLookupResponse => handleCostResponse(costResponse)
case message: PollResultMessage =>
message match {
case ProcessThisPollResult(pollResult: RunStatus) => processPollResult(pollResult)
Expand All @@ -97,4 +93,5 @@

override def params: PollMonitorParameters = pollMonitorParameters

override def extractVmInfoFromRunState(pollStatus: RunStatus): Option[InstantiatedVmInfo] = Option.empty // TODO

Check warning on line 96 in supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/actors/BatchPollResultMonitorActor.scala

View check run for this annotation

Codecov / codecov/patch

supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/actors/BatchPollResultMonitorActor.scala#L96

Added line #L96 was not covered by tests
}
Original file line number Diff line number Diff line change
Expand Up @@ -1025,18 +1025,6 @@ class GcpBatchAsyncBackendJobExecutionActor(override val standardParams: Standar
} yield status
}

override val pollingResultMonitorActor: Option[ActorRef] = Option(
context.actorOf(
BatchPollResultMonitorActor.props(serviceRegistryActor,
workflowDescriptor,
jobDescriptor,
validatedRuntimeAttributes,
platform,
jobLogger
)
)
)

override def isTerminal(runStatus: RunStatus): Boolean =
runStatus match {
case _: RunStatus.TerminalRunStatus => true
Expand Down Expand Up @@ -1082,7 +1070,7 @@ class GcpBatchAsyncBackendJobExecutionActor(override val standardParams: Standar
Future.fromTry {
Try {
runStatus match {
case RunStatus.Aborted(_, _) => AbortedExecutionHandle
case RunStatus.Aborted(_) => AbortedExecutionHandle
case failedStatus: RunStatus.UnsuccessfulRunStatus => handleFailedRunStatus(failedStatus)
case unknown =>
throw new RuntimeException(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package cromwell.backend.google.batch.api.request

import com.google.api.gax.rpc.{ApiException, StatusCode}
import com.google.cloud.batch.v1.AllocationPolicy.ProvisioningModel
import com.google.cloud.batch.v1._
import com.typesafe.scalalogging.LazyLogging
import cromwell.backend.google.batch.actors.BatchApiAbortClient.{
Expand All @@ -12,8 +11,6 @@ import cromwell.backend.google.batch.api.BatchApiRequestManager._
import cromwell.backend.google.batch.api.{BatchApiRequestManager, BatchApiResponse}
import cromwell.backend.google.batch.models.{GcpBatchExitCode, RunStatus}
import cromwell.core.ExecutionEvent
import cromwell.services.cost.InstantiatedVmInfo
import cromwell.services.metadata.CallMetadataKeys

import scala.annotation.unused
import scala.concurrent.{ExecutionContext, Future, Promise}
Expand Down Expand Up @@ -139,32 +136,14 @@ object BatchRequestExecutor {
)
lazy val exitCode = findBatchExitCode(events)

// Get vm info for this job
val allocationPolicy = job.getAllocationPolicy

// Get instances that can be created with this AllocationPolicy, only instances[0] is supported
val instancePolicy = allocationPolicy.getInstances(0).getPolicy
val machineType = instancePolicy.getMachineType
val preemtible = instancePolicy.getProvisioningModelValue == ProvisioningModel.PREEMPTIBLE.getNumber

// location list = [regions/us-central1, zones/us-central1-b], region is the first element
val location = allocationPolicy.getLocation.getAllowedLocationsList.get(0)
val region =
if (location.isEmpty)
"us-central1"
else
location.split("/").last

val instantiatedVmInfo = Some(InstantiatedVmInfo(region, machineType, preemtible))

if (job.getStatus.getState == JobStatus.State.SUCCEEDED) {
RunStatus.Success(events, instantiatedVmInfo)
RunStatus.Success(events)
} else if (job.getStatus.getState == JobStatus.State.RUNNING) {
RunStatus.Running(events, instantiatedVmInfo)
RunStatus.Running(events)
} else if (job.getStatus.getState == JobStatus.State.FAILED) {
RunStatus.Failed(exitCode, events, instantiatedVmInfo)
RunStatus.Failed(exitCode, events)
} else {
RunStatus.Initializing(events, instantiatedVmInfo)
RunStatus.Initializing(events)
}
}

Expand All @@ -173,20 +152,12 @@ object BatchRequestExecutor {
GcpBatchExitCode.fromEventMessage(e.name.toLowerCase)
}.headOption

private def getEventList(events: List[StatusEvent]): List[ExecutionEvent] = {
val startedRegex = ".*SCHEDULED to RUNNING.*".r
val endedRegex = ".*RUNNING to.*".r // can be SUCCEEDED or FAILED
private def getEventList(events: List[StatusEvent]): List[ExecutionEvent] =
events.map { e =>
val time = java.time.Instant
.ofEpochSecond(e.getEventTime.getSeconds, e.getEventTime.getNanos.toLong)
.atOffset(java.time.ZoneOffset.UTC)
val eventType = e.getDescription match {
case startedRegex() => CallMetadataKeys.VmStartTime
case endedRegex() => CallMetadataKeys.VmEndTime
case _ => e.getType
}
ExecutionEvent(name = eventType, offsetDateTime = time)
ExecutionEvent(name = e.getDescription, offsetDateTime = time)
}
}
}
}
Original file line number Diff line number Diff line change
@@ -1,32 +1,24 @@
package cromwell.backend.google.batch.models

import cromwell.core.ExecutionEvent
import cromwell.services.cost.InstantiatedVmInfo

sealed trait RunStatus {
def eventList: Seq[ExecutionEvent]
def toString: String

val instantiatedVmInfo: Option[InstantiatedVmInfo]
}

object RunStatus {

case class Initializing(eventList: Seq[ExecutionEvent], instantiatedVmInfo: Option[InstantiatedVmInfo] = Option.empty)
extends RunStatus { override def toString = "Initializing" }
case class AwaitingCloudQuota(eventList: Seq[ExecutionEvent],
instantiatedVmInfo: Option[InstantiatedVmInfo] = Option.empty
) extends RunStatus {
case class Initializing(eventList: Seq[ExecutionEvent]) extends RunStatus { override def toString = "Initializing" }
case class AwaitingCloudQuota(eventList: Seq[ExecutionEvent]) extends RunStatus {
override def toString = "AwaitingCloudQuota"
}

case class Running(eventList: Seq[ExecutionEvent], instantiatedVmInfo: Option[InstantiatedVmInfo] = Option.empty)
extends RunStatus { override def toString = "Running" }
case class Running(eventList: Seq[ExecutionEvent]) extends RunStatus { override def toString = "Running" }

sealed trait TerminalRunStatus extends RunStatus

case class Success(eventList: Seq[ExecutionEvent], instantiatedVmInfo: Option[InstantiatedVmInfo] = Option.empty)
extends TerminalRunStatus {
case class Success(eventList: Seq[ExecutionEvent]) extends TerminalRunStatus {
override def toString = "Success"
}

Expand All @@ -37,8 +29,7 @@ object RunStatus {

final case class Failed(
exitCode: Option[GcpBatchExitCode],
eventList: Seq[ExecutionEvent],
instantiatedVmInfo: Option[InstantiatedVmInfo] = Option.empty
eventList: Seq[ExecutionEvent]
) extends UnsuccessfulRunStatus {
override def toString = "Failed"

Expand Down Expand Up @@ -67,9 +58,7 @@ object RunStatus {
}
}

final case class Aborted(eventList: Seq[ExecutionEvent],
instantiatedVmInfo: Option[InstantiatedVmInfo] = Option.empty
) extends UnsuccessfulRunStatus {
final case class Aborted(eventList: Seq[ExecutionEvent]) extends UnsuccessfulRunStatus {
override def toString = "Aborted"

override val exitCode: Option[GcpBatchExitCode] = None
Expand Down

This file was deleted.

Loading
Loading