Skip to content

Commit

Permalink
Revert "[AN-146] Emit VM cost for GCP Batch" (#7669)
Browse files Browse the repository at this point in the history
Co-authored-by: Adam Nichols <aednichols@gmail.com>
  • Loading branch information
lucymcnatt and aednichols authored Dec 10, 2024
1 parent 7ce1cc2 commit 860429b
Show file tree
Hide file tree
Showing 7 changed files with 15 additions and 419 deletions.
4 changes: 1 addition & 3 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,6 @@ object Dependencies {
private val metrics3StatsdV = "4.2.0"
private val mockFtpServerV = "3.0.0"
private val mockitoV = "3.12.4"
private val mockitoInlineV = "2.8.9"
private val mockserverNettyV = "5.14.0"
private val mouseV = "1.0.11"

Expand Down Expand Up @@ -626,8 +625,7 @@ object Dependencies {
"org.scalatest" %% "scalatest" % scalatestV,
// Use mockito Java DSL directly instead of the numerous and often hard to keep updated Scala DSLs.
// See also scaladoc in common.mock.MockSugar and that trait's various usages.
"org.mockito" % "mockito-core" % mockitoV,
"org.mockito" % "mockito-inline" % mockitoInlineV
"org.mockito" % "mockito-core" % mockitoV
) ++ slf4jBindingDependencies // During testing, add an slf4j binding for _all_ libraries.

val kindProjectorPlugin = "org.typelevel" % "kind-projector" % kindProjectorV cross CrossVersion.full
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,6 @@ class BatchPollResultMonitorActor(pollMonitorParameters: PollMonitorParameters)
case event if event.name == CallMetadataKeys.VmEndTime => event.offsetDateTime
}

override def extractVmInfoFromRunState(pollStatus: RunStatus): Option[InstantiatedVmInfo] =
pollStatus.instantiatedVmInfo

override def handleVmCostLookup(vmInfo: InstantiatedVmInfo) = {
val request = GcpCostLookupRequest(vmInfo, self)
params.serviceRegistry ! request
Expand All @@ -72,7 +69,6 @@ class BatchPollResultMonitorActor(pollMonitorParameters: PollMonitorParameters)
}

override def receive: Receive = {
case costResponse: GcpCostLookupResponse => handleCostResponse(costResponse)
case message: PollResultMessage =>
message match {
case ProcessThisPollResult(pollResult: RunStatus) => processPollResult(pollResult)
Expand All @@ -97,4 +93,5 @@ class BatchPollResultMonitorActor(pollMonitorParameters: PollMonitorParameters)

override def params: PollMonitorParameters = pollMonitorParameters

override def extractVmInfoFromRunState(pollStatus: RunStatus): Option[InstantiatedVmInfo] = Option.empty // TODO
}
Original file line number Diff line number Diff line change
Expand Up @@ -1025,18 +1025,6 @@ class GcpBatchAsyncBackendJobExecutionActor(override val standardParams: Standar
} yield status
}

override val pollingResultMonitorActor: Option[ActorRef] = Option(
context.actorOf(
BatchPollResultMonitorActor.props(serviceRegistryActor,
workflowDescriptor,
jobDescriptor,
validatedRuntimeAttributes,
platform,
jobLogger
)
)
)

override def isTerminal(runStatus: RunStatus): Boolean =
runStatus match {
case _: RunStatus.TerminalRunStatus => true
Expand Down Expand Up @@ -1082,7 +1070,7 @@ class GcpBatchAsyncBackendJobExecutionActor(override val standardParams: Standar
Future.fromTry {
Try {
runStatus match {
case RunStatus.Aborted(_, _) => AbortedExecutionHandle
case RunStatus.Aborted(_) => AbortedExecutionHandle
case failedStatus: RunStatus.UnsuccessfulRunStatus => handleFailedRunStatus(failedStatus)
case unknown =>
throw new RuntimeException(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package cromwell.backend.google.batch.api.request

import com.google.api.gax.rpc.{ApiException, StatusCode}
import com.google.cloud.batch.v1.AllocationPolicy.ProvisioningModel
import com.google.cloud.batch.v1._
import com.typesafe.scalalogging.LazyLogging
import cromwell.backend.google.batch.actors.BatchApiAbortClient.{
Expand All @@ -12,8 +11,6 @@ import cromwell.backend.google.batch.api.BatchApiRequestManager._
import cromwell.backend.google.batch.api.{BatchApiRequestManager, BatchApiResponse}
import cromwell.backend.google.batch.models.{GcpBatchExitCode, RunStatus}
import cromwell.core.ExecutionEvent
import cromwell.services.cost.InstantiatedVmInfo
import cromwell.services.metadata.CallMetadataKeys

import scala.annotation.unused
import scala.concurrent.{ExecutionContext, Future, Promise}
Expand Down Expand Up @@ -139,32 +136,14 @@ object BatchRequestExecutor {
)
lazy val exitCode = findBatchExitCode(events)

// Get vm info for this job
val allocationPolicy = job.getAllocationPolicy

// Get instances that can be created with this AllocationPolicy, only instances[0] is supported
val instancePolicy = allocationPolicy.getInstances(0).getPolicy
val machineType = instancePolicy.getMachineType
val preemtible = instancePolicy.getProvisioningModelValue == ProvisioningModel.PREEMPTIBLE.getNumber

// location list = [regions/us-central1, zones/us-central1-b], region is the first element
val location = allocationPolicy.getLocation.getAllowedLocationsList.get(0)
val region =
if (location.isEmpty)
"us-central1"
else
location.split("/").last

val instantiatedVmInfo = Some(InstantiatedVmInfo(region, machineType, preemtible))

if (job.getStatus.getState == JobStatus.State.SUCCEEDED) {
RunStatus.Success(events, instantiatedVmInfo)
RunStatus.Success(events)
} else if (job.getStatus.getState == JobStatus.State.RUNNING) {
RunStatus.Running(events, instantiatedVmInfo)
RunStatus.Running(events)
} else if (job.getStatus.getState == JobStatus.State.FAILED) {
RunStatus.Failed(exitCode, events, instantiatedVmInfo)
RunStatus.Failed(exitCode, events)
} else {
RunStatus.Initializing(events, instantiatedVmInfo)
RunStatus.Initializing(events)
}
}

Expand All @@ -173,20 +152,12 @@ object BatchRequestExecutor {
GcpBatchExitCode.fromEventMessage(e.name.toLowerCase)
}.headOption

private def getEventList(events: List[StatusEvent]): List[ExecutionEvent] = {
val startedRegex = ".*SCHEDULED to RUNNING.*".r
val endedRegex = ".*RUNNING to.*".r // can be SUCCEEDED or FAILED
private def getEventList(events: List[StatusEvent]): List[ExecutionEvent] =
events.map { e =>
val time = java.time.Instant
.ofEpochSecond(e.getEventTime.getSeconds, e.getEventTime.getNanos.toLong)
.atOffset(java.time.ZoneOffset.UTC)
val eventType = e.getDescription match {
case startedRegex() => CallMetadataKeys.VmStartTime
case endedRegex() => CallMetadataKeys.VmEndTime
case _ => e.getType
}
ExecutionEvent(name = eventType, offsetDateTime = time)
ExecutionEvent(name = e.getDescription, offsetDateTime = time)
}
}
}
}
Original file line number Diff line number Diff line change
@@ -1,32 +1,24 @@
package cromwell.backend.google.batch.models

import cromwell.core.ExecutionEvent
import cromwell.services.cost.InstantiatedVmInfo

sealed trait RunStatus {
def eventList: Seq[ExecutionEvent]
def toString: String

val instantiatedVmInfo: Option[InstantiatedVmInfo]
}

object RunStatus {

case class Initializing(eventList: Seq[ExecutionEvent], instantiatedVmInfo: Option[InstantiatedVmInfo] = Option.empty)
extends RunStatus { override def toString = "Initializing" }
case class AwaitingCloudQuota(eventList: Seq[ExecutionEvent],
instantiatedVmInfo: Option[InstantiatedVmInfo] = Option.empty
) extends RunStatus {
case class Initializing(eventList: Seq[ExecutionEvent]) extends RunStatus { override def toString = "Initializing" }
case class AwaitingCloudQuota(eventList: Seq[ExecutionEvent]) extends RunStatus {
override def toString = "AwaitingCloudQuota"
}

case class Running(eventList: Seq[ExecutionEvent], instantiatedVmInfo: Option[InstantiatedVmInfo] = Option.empty)
extends RunStatus { override def toString = "Running" }
case class Running(eventList: Seq[ExecutionEvent]) extends RunStatus { override def toString = "Running" }

sealed trait TerminalRunStatus extends RunStatus

case class Success(eventList: Seq[ExecutionEvent], instantiatedVmInfo: Option[InstantiatedVmInfo] = Option.empty)
extends TerminalRunStatus {
case class Success(eventList: Seq[ExecutionEvent]) extends TerminalRunStatus {
override def toString = "Success"
}

Expand All @@ -37,8 +29,7 @@ object RunStatus {

final case class Failed(
exitCode: Option[GcpBatchExitCode],
eventList: Seq[ExecutionEvent],
instantiatedVmInfo: Option[InstantiatedVmInfo] = Option.empty
eventList: Seq[ExecutionEvent]
) extends UnsuccessfulRunStatus {
override def toString = "Failed"

Expand Down Expand Up @@ -67,9 +58,7 @@ object RunStatus {
}
}

final case class Aborted(eventList: Seq[ExecutionEvent],
instantiatedVmInfo: Option[InstantiatedVmInfo] = Option.empty
) extends UnsuccessfulRunStatus {
final case class Aborted(eventList: Seq[ExecutionEvent]) extends UnsuccessfulRunStatus {
override def toString = "Aborted"

override val exitCode: Option[GcpBatchExitCode] = None
Expand Down

This file was deleted.

Loading

0 comments on commit 860429b

Please sign in to comment.