diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 3c0e1ab9422..009684522bf 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -88,6 +88,7 @@ object Dependencies { private val metrics3StatsdV = "4.2.0" private val mockFtpServerV = "3.0.0" private val mockitoV = "3.12.4" + private val mockitoInlineV = "2.8.9" private val mockserverNettyV = "5.14.0" private val mouseV = "1.0.11" @@ -625,7 +626,8 @@ object Dependencies { "org.scalatest" %% "scalatest" % scalatestV, // Use mockito Java DSL directly instead of the numerous and often hard to keep updated Scala DSLs. // See also scaladoc in common.mock.MockSugar and that trait's various usages. - "org.mockito" % "mockito-core" % mockitoV + "org.mockito" % "mockito-core" % mockitoV, + "org.mockito" % "mockito-inline" % mockitoInlineV ) ++ slf4jBindingDependencies // During testing, add an slf4j binding for _all_ libraries. val kindProjectorPlugin = "org.typelevel" % "kind-projector" % kindProjectorV cross CrossVersion.full diff --git a/supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/actors/BatchPollResultMonitorActor.scala b/supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/actors/BatchPollResultMonitorActor.scala index 0f5c00fa834..f88651552db 100644 --- a/supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/actors/BatchPollResultMonitorActor.scala +++ b/supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/actors/BatchPollResultMonitorActor.scala @@ -47,6 +47,9 @@ class BatchPollResultMonitorActor(pollMonitorParameters: PollMonitorParameters) case event if event.name == CallMetadataKeys.VmEndTime => event.offsetDateTime } + override def extractVmInfoFromRunState(pollStatus: RunStatus): Option[InstantiatedVmInfo] = + pollStatus.instantiatedVmInfo + override def handleVmCostLookup(vmInfo: InstantiatedVmInfo) = { val request = GcpCostLookupRequest(vmInfo, self) params.serviceRegistry ! request @@ -69,6 +72,7 @@ class BatchPollResultMonitorActor(pollMonitorParameters: PollMonitorParameters) } override def receive: Receive = { + case costResponse: GcpCostLookupResponse => handleCostResponse(costResponse) case message: PollResultMessage => message match { case ProcessThisPollResult(pollResult: RunStatus) => processPollResult(pollResult) @@ -93,5 +97,4 @@ class BatchPollResultMonitorActor(pollMonitorParameters: PollMonitorParameters) override def params: PollMonitorParameters = pollMonitorParameters - override def extractVmInfoFromRunState(pollStatus: RunStatus): Option[InstantiatedVmInfo] = Option.empty // TODO } diff --git a/supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/actors/GcpBatchAsyncBackendJobExecutionActor.scala b/supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/actors/GcpBatchAsyncBackendJobExecutionActor.scala index 58ab8c49dc4..9aa98c8da8b 100644 --- a/supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/actors/GcpBatchAsyncBackendJobExecutionActor.scala +++ b/supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/actors/GcpBatchAsyncBackendJobExecutionActor.scala @@ -1025,6 +1025,18 @@ class GcpBatchAsyncBackendJobExecutionActor(override val standardParams: Standar } yield status } + override val pollingResultMonitorActor: Option[ActorRef] = Option( + context.actorOf( + BatchPollResultMonitorActor.props(serviceRegistryActor, + workflowDescriptor, + jobDescriptor, + validatedRuntimeAttributes, + platform, + jobLogger + ) + ) + ) + override def isTerminal(runStatus: RunStatus): Boolean = runStatus match { case _: RunStatus.TerminalRunStatus => true @@ -1070,7 +1082,7 @@ class GcpBatchAsyncBackendJobExecutionActor(override val standardParams: Standar Future.fromTry { Try { runStatus match { - case RunStatus.Aborted(_) => AbortedExecutionHandle + case RunStatus.Aborted(_, _) => AbortedExecutionHandle case failedStatus: RunStatus.UnsuccessfulRunStatus => handleFailedRunStatus(failedStatus) case unknown => throw new RuntimeException( diff --git a/supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/api/request/BatchRequestExecutor.scala b/supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/api/request/BatchRequestExecutor.scala index ae8e67b75e2..547dbc853a8 100644 --- a/supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/api/request/BatchRequestExecutor.scala +++ b/supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/api/request/BatchRequestExecutor.scala @@ -1,6 +1,7 @@ package cromwell.backend.google.batch.api.request import com.google.api.gax.rpc.{ApiException, StatusCode} +import com.google.cloud.batch.v1.AllocationPolicy.ProvisioningModel import com.google.cloud.batch.v1._ import com.typesafe.scalalogging.LazyLogging import cromwell.backend.google.batch.actors.BatchApiAbortClient.{ @@ -11,6 +12,8 @@ import cromwell.backend.google.batch.api.BatchApiRequestManager._ import cromwell.backend.google.batch.api.{BatchApiRequestManager, BatchApiResponse} import cromwell.backend.google.batch.models.{GcpBatchExitCode, RunStatus} import cromwell.core.ExecutionEvent +import cromwell.services.cost.InstantiatedVmInfo +import cromwell.services.metadata.CallMetadataKeys import scala.annotation.unused import scala.concurrent.{ExecutionContext, Future, Promise} @@ -136,14 +139,32 @@ object BatchRequestExecutor { ) lazy val exitCode = findBatchExitCode(events) + // Get vm info for this job + val allocationPolicy = job.getAllocationPolicy + + // Get instances that can be created with this AllocationPolicy, only instances[0] is supported + val instancePolicy = allocationPolicy.getInstances(0).getPolicy + val machineType = instancePolicy.getMachineType + val preemtible = instancePolicy.getProvisioningModelValue == ProvisioningModel.PREEMPTIBLE.getNumber + + // location list = [regions/us-central1, zones/us-central1-b], region is the first element + val location = allocationPolicy.getLocation.getAllowedLocationsList.get(0) + val region = + if (location.isEmpty) + "us-central1" + else + location.split("/").last + + val instantiatedVmInfo = Some(InstantiatedVmInfo(region, machineType, preemtible)) + if (job.getStatus.getState == JobStatus.State.SUCCEEDED) { - RunStatus.Success(events) + RunStatus.Success(events, instantiatedVmInfo) } else if (job.getStatus.getState == JobStatus.State.RUNNING) { - RunStatus.Running(events) + RunStatus.Running(events, instantiatedVmInfo) } else if (job.getStatus.getState == JobStatus.State.FAILED) { - RunStatus.Failed(exitCode, events) + RunStatus.Failed(exitCode, events, instantiatedVmInfo) } else { - RunStatus.Initializing(events) + RunStatus.Initializing(events, instantiatedVmInfo) } } @@ -152,12 +173,20 @@ object BatchRequestExecutor { GcpBatchExitCode.fromEventMessage(e.name.toLowerCase) }.headOption - private def getEventList(events: List[StatusEvent]): List[ExecutionEvent] = + private def getEventList(events: List[StatusEvent]): List[ExecutionEvent] = { + val startedRegex = ".*SCHEDULED to RUNNING.*".r + val endedRegex = ".*RUNNING to.*".r // can be SUCCEEDED or FAILED events.map { e => val time = java.time.Instant .ofEpochSecond(e.getEventTime.getSeconds, e.getEventTime.getNanos.toLong) .atOffset(java.time.ZoneOffset.UTC) - ExecutionEvent(name = e.getDescription, offsetDateTime = time) + val eventType = e.getDescription match { + case startedRegex() => CallMetadataKeys.VmStartTime + case endedRegex() => CallMetadataKeys.VmEndTime + case _ => e.getType + } + ExecutionEvent(name = eventType, offsetDateTime = time) } + } } } diff --git a/supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/models/RunStatus.scala b/supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/models/RunStatus.scala index b231e6e969a..80c1b63f405 100644 --- a/supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/models/RunStatus.scala +++ b/supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/models/RunStatus.scala @@ -1,24 +1,32 @@ package cromwell.backend.google.batch.models import cromwell.core.ExecutionEvent +import cromwell.services.cost.InstantiatedVmInfo sealed trait RunStatus { def eventList: Seq[ExecutionEvent] def toString: String + + val instantiatedVmInfo: Option[InstantiatedVmInfo] } object RunStatus { - case class Initializing(eventList: Seq[ExecutionEvent]) extends RunStatus { override def toString = "Initializing" } - case class AwaitingCloudQuota(eventList: Seq[ExecutionEvent]) extends RunStatus { + case class Initializing(eventList: Seq[ExecutionEvent], instantiatedVmInfo: Option[InstantiatedVmInfo] = Option.empty) + extends RunStatus { override def toString = "Initializing" } + case class AwaitingCloudQuota(eventList: Seq[ExecutionEvent], + instantiatedVmInfo: Option[InstantiatedVmInfo] = Option.empty + ) extends RunStatus { override def toString = "AwaitingCloudQuota" } - case class Running(eventList: Seq[ExecutionEvent]) extends RunStatus { override def toString = "Running" } + case class Running(eventList: Seq[ExecutionEvent], instantiatedVmInfo: Option[InstantiatedVmInfo] = Option.empty) + extends RunStatus { override def toString = "Running" } sealed trait TerminalRunStatus extends RunStatus - case class Success(eventList: Seq[ExecutionEvent]) extends TerminalRunStatus { + case class Success(eventList: Seq[ExecutionEvent], instantiatedVmInfo: Option[InstantiatedVmInfo] = Option.empty) + extends TerminalRunStatus { override def toString = "Success" } @@ -29,7 +37,8 @@ object RunStatus { final case class Failed( exitCode: Option[GcpBatchExitCode], - eventList: Seq[ExecutionEvent] + eventList: Seq[ExecutionEvent], + instantiatedVmInfo: Option[InstantiatedVmInfo] = Option.empty ) extends UnsuccessfulRunStatus { override def toString = "Failed" @@ -58,7 +67,9 @@ object RunStatus { } } - final case class Aborted(eventList: Seq[ExecutionEvent]) extends UnsuccessfulRunStatus { + final case class Aborted(eventList: Seq[ExecutionEvent], + instantiatedVmInfo: Option[InstantiatedVmInfo] = Option.empty + ) extends UnsuccessfulRunStatus { override def toString = "Aborted" override val exitCode: Option[GcpBatchExitCode] = None diff --git a/supportedBackends/google/batch/src/test/scala/cromwell/backend/google/batch/actors/BatchPollResultMonitorActorSpec.scala b/supportedBackends/google/batch/src/test/scala/cromwell/backend/google/batch/actors/BatchPollResultMonitorActorSpec.scala new file mode 100644 index 00000000000..2df777e0225 --- /dev/null +++ b/supportedBackends/google/batch/src/test/scala/cromwell/backend/google/batch/actors/BatchPollResultMonitorActorSpec.scala @@ -0,0 +1,156 @@ +package cromwell.backend.google.batch.actors + +import akka.actor.{ActorRef, ActorSystem, Props} +import akka.testkit.{TestKit, TestProbe} +import cats.data.Validated.Valid +import common.mock.MockSugar +import cromwell.backend.google.batch.models.GcpBatchRuntimeAttributes +import cromwell.backend.{BackendJobDescriptor, BackendJobDescriptorKey, RuntimeAttributeDefinition} +import cromwell.core.callcaching.NoDocker +import cromwell.core.{ExecutionEvent, WorkflowOptions} +import cromwell.core.logging.JobLogger +import cromwell.services.cost.{GcpCostLookupRequest, GcpCostLookupResponse, InstantiatedVmInfo} +import cromwell.services.keyvalue.InMemoryKvServiceActor +import org.scalatest.flatspec.AnyFlatSpecLike +import org.scalatest.matchers.should.Matchers +import cromwell.backend.google.batch.models.GcpBatchTestConfig._ +import wom.graph.CommandCallNode +import cromwell.backend._ +import cromwell.backend.google.batch.models._ +import cromwell.backend.io.TestWorkflows +import cromwell.backend.standard.pollmonitoring.ProcessThisPollResult +import cromwell.services.metadata.CallMetadataKeys +import cromwell.services.metadata.MetadataService.PutMetadataAction +import org.slf4j.helpers.NOPLogger +import wom.values.WomString + +import java.time.{Instant, OffsetDateTime} +import java.time.temporal.ChronoUnit +import scala.concurrent.duration.DurationInt + +class BatchPollResultMonitorActorSpec + extends TestKit(ActorSystem("BatchPollResultMonitorActorSpec")) + with AnyFlatSpecLike + with BackendSpec + with Matchers + with MockSugar { + + var kvService: ActorRef = system.actorOf(Props(new InMemoryKvServiceActor), "kvService") + val runtimeAttributesBuilder = GcpBatchRuntimeAttributes.runtimeAttributesBuilder(gcpBatchConfiguration) + val jobLogger = mock[JobLogger] + val serviceRegistry = TestProbe() + + val workflowDescriptor = buildWdlWorkflowDescriptor(TestWorkflows.HelloWorld) + val call: CommandCallNode = workflowDescriptor.callable.taskCallNodes.head + val jobKey = BackendJobDescriptorKey(call, None, 1) + + val jobDescriptor = BackendJobDescriptor(workflowDescriptor, + jobKey, + runtimeAttributes = Map.empty, + evaluatedTaskInputs = Map.empty, + NoDocker, + None, + Map.empty + ) + + val runtimeAttributes = Map("docker" -> WomString("ubuntu:latest")) + + val staticRuntimeAttributeDefinitions: Set[RuntimeAttributeDefinition] = + GcpBatchRuntimeAttributes.runtimeAttributesBuilder(GcpBatchTestConfig.gcpBatchConfiguration).definitions.toSet + + val defaultedAttributes = + RuntimeAttributeDefinition.addDefaultsToAttributes(staticRuntimeAttributeDefinitions, + WorkflowOptions.fromMap(Map.empty).get + )( + runtimeAttributes + ) + val validatedRuntimeAttributes = runtimeAttributesBuilder.build(defaultedAttributes, NOPLogger.NOP_LOGGER) + + val actor = system.actorOf( + BatchPollResultMonitorActor.props(serviceRegistry.ref, + workflowDescriptor, + jobDescriptor, + validatedRuntimeAttributes, + Some(Gcp), + jobLogger + ) + ) + val vmInfo = InstantiatedVmInfo("europe-west9", "custom-16-32768", false) + + behavior of "BatchPollResultMonitorActor" + + it should "send a cost lookup request with the correct vm info after receiving a success pollResult" in { + + val terminalPollResult = + RunStatus.Success(Seq(ExecutionEvent("fakeEvent", OffsetDateTime.now().truncatedTo(ChronoUnit.MILLIS))), + Some(vmInfo) + ) + val message = ProcessThisPollResult(terminalPollResult) + + actor ! message + + serviceRegistry.expectMsgPF(1.seconds) { case m: GcpCostLookupRequest => + m.vmInfo shouldBe vmInfo + } + } + + it should "emit the correct cost metadata after receiving a costLookupResponse" in { + + val costLookupResponse = GcpCostLookupResponse(vmInfo, Valid(BigDecimal(0.1))) + + actor ! costLookupResponse + + serviceRegistry.expectMsgPF(1.seconds) { case m: PutMetadataAction => + val event = m.events.head + m.events.size shouldBe 1 + event.key.key shouldBe CallMetadataKeys.VmCostPerHour + event.value.get.value shouldBe "0.1" + } + } + + it should "emit the correct start time after receiving a running pollResult" in { + + val vmStartTime = OffsetDateTime.now().minus(2, ChronoUnit.HOURS) + val pollResult = RunStatus.Running( + Seq(ExecutionEvent(CallMetadataKeys.VmStartTime, vmStartTime)), + Some(vmInfo) + ) + val message = ProcessThisPollResult(pollResult) + + actor ! message + + serviceRegistry.expectMsgPF(1.seconds) { case m: PutMetadataAction => + val event = m.events.head + m.events.size shouldBe 1 + event.key.key shouldBe CallMetadataKeys.VmStartTime + assert( + Instant + .parse(event.value.get.value) + .equals(vmStartTime.toInstant.truncatedTo(ChronoUnit.MILLIS)) + ) + } + } + + it should "emit the correct end time after receiving a running pollResult" in { + + val vmEndTime = OffsetDateTime.now().minus(2, ChronoUnit.HOURS) + val pollResult = RunStatus.Running( + Seq(ExecutionEvent(CallMetadataKeys.VmEndTime, vmEndTime)), + Some(vmInfo) + ) + val message = ProcessThisPollResult(pollResult) + + actor ! message + + serviceRegistry.expectMsgPF(1.seconds) { case m: PutMetadataAction => + val event = m.events.head + m.events.size shouldBe 1 + event.key.key shouldBe CallMetadataKeys.VmEndTime + assert( + Instant + .parse(event.value.get.value) + .equals(vmEndTime.toInstant.truncatedTo(ChronoUnit.MILLIS)) + ) + } + } +} diff --git a/supportedBackends/google/batch/src/test/scala/cromwell/backend/google/batch/api/BatchRequestExecutorSpec.scala b/supportedBackends/google/batch/src/test/scala/cromwell/backend/google/batch/api/BatchRequestExecutorSpec.scala new file mode 100644 index 00000000000..79ac68772dd --- /dev/null +++ b/supportedBackends/google/batch/src/test/scala/cromwell/backend/google/batch/api/BatchRequestExecutorSpec.scala @@ -0,0 +1,191 @@ +package cromwell.backend.google.batch.api.request + +import akka.actor.ActorSystem +import akka.testkit.TestKit +import com.google.cloud.batch.v1.{ + AllocationPolicy, + BatchServiceClient, + BatchServiceSettings, + GetJobRequest, + Job, + JobStatus, + StatusEvent +} +import com.google.cloud.batch.v1.AllocationPolicy.{ + InstancePolicy, + InstancePolicyOrTemplate, + LocationPolicy, + ProvisioningModel +} +import com.google.cloud.batch.v1.JobStatus.State +import com.google.protobuf.Timestamp +import common.mock.MockSugar +import cromwell.backend.google.batch.api.BatchApiResponse +import cromwell.backend.google.batch.models.RunStatus +import org.mockito.ArgumentMatchers.any +import org.mockito.Mockito.doReturn +import org.scalatest.PrivateMethodTester +import org.scalatest.flatspec.AnyFlatSpecLike +import org.scalatest.matchers.should.Matchers + +class BatchRequestExecutorSpec + extends TestKit(ActorSystem("BatchRequestExecutorSpec")) + with AnyFlatSpecLike + with Matchers + with MockSugar + with PrivateMethodTester { + + def setupBatchClient(machineType: String = "n1-standard-1", + location: String = "regions/us-central1", + jobState: State = JobStatus.State.SUCCEEDED + ): BatchServiceClient = { + val instancePolicy = InstancePolicy + .newBuilder() + .setMachineType(machineType) + .setProvisioningModel(ProvisioningModel.PREEMPTIBLE) + .build() + + val allocationPolicy = AllocationPolicy + .newBuilder() + .setLocation(LocationPolicy.newBuilder().addAllowedLocations(location)) + .addInstances(InstancePolicyOrTemplate.newBuilder().setPolicy(instancePolicy)) + .build() + + val startStatusEvent = StatusEvent + .newBuilder() + .setType("STATUS_CHANGED") + .setEventTime(Timestamp.newBuilder().setSeconds(1).build()) + .setDescription("Job state is set from SCHEDULED to RUNNING for job...") + .build() + + val endStatusEvent = StatusEvent + .newBuilder() + .setType("STATUS_CHANGED") + .setEventTime(Timestamp.newBuilder().setSeconds(2).build()) + .setDescription("Job state is set from RUNNING to SOME_OTHER_STATUS for job...") + .build() + + val jobStatus = JobStatus + .newBuilder() + .setState(jobState) + .addStatusEvents(startStatusEvent) + .addStatusEvents(endStatusEvent) + .build() + + val job = Job.newBuilder().setAllocationPolicy(allocationPolicy).setStatus(jobStatus).build() + + val mockClient = mock[BatchServiceClient] + doReturn(job).when(mockClient).getJob(any[GetJobRequest]) + doReturn(job).when(mockClient).getJob(any[String]) + + mockClient + } + + behavior of "BatchRequestExecutor" + + it should "create instantiatedVmInfo correctly" in { + + val mockClient = setupBatchClient(jobState = JobStatus.State.RUNNING) + // Create the BatchRequestExecutor + val batchRequestExecutor = new BatchRequestExecutor.CloudImpl(BatchServiceSettings.newBuilder().build()) + + // testing a private method see https://www.scalatest.org/user_guide/using_PrivateMethodTester + val internalGetHandler = PrivateMethod[BatchApiResponse.StatusQueried](Symbol("internalGetHandler")) + val result = batchRequestExecutor invokePrivate internalGetHandler(mockClient, GetJobRequest.newBuilder().build()) + + // Verify the instantiatedVmInfo + result.status match { + case RunStatus.Running(_, Some(instantiatedVmInfo)) => + instantiatedVmInfo.region shouldBe "us-central1" + instantiatedVmInfo.machineType shouldBe "n1-standard-1" + instantiatedVmInfo.preemptible shouldBe true + case _ => fail("Expected RunStatus.Running with instantiatedVmInfo") + } + } + + it should "create instantiatedVmInfo correctly with different location info" in { + + val mockClient = setupBatchClient(location = "zones/us-central1-a", jobState = JobStatus.State.RUNNING) + + // Create the BatchRequestExecutor + val batchRequestExecutor = new BatchRequestExecutor.CloudImpl(BatchServiceSettings.newBuilder().build()) + + // testing a private method see https://www.scalatest.org/user_guide/using_PrivateMethodTester + val internalGetHandler = PrivateMethod[BatchApiResponse.StatusQueried](Symbol("internalGetHandler")) + val result = batchRequestExecutor invokePrivate internalGetHandler(mockClient, GetJobRequest.newBuilder().build()) + + // Verify the instantiatedVmInfo + result.status match { + case RunStatus.Running(_, Some(instantiatedVmInfo)) => + instantiatedVmInfo.region shouldBe "us-central1-a" + instantiatedVmInfo.machineType shouldBe "n1-standard-1" + instantiatedVmInfo.preemptible shouldBe true + case _ => fail("Expected RunStatus.Running with instantiatedVmInfo") + } + } + + it should "create instantiatedVmInfo correctly with missing location info" in { + + val mockClient = setupBatchClient(jobState = JobStatus.State.RUNNING) + + // Create the BatchRequestExecutor + val batchRequestExecutor = new BatchRequestExecutor.CloudImpl(BatchServiceSettings.newBuilder().build()) + + // testing a private method see https://www.scalatest.org/user_guide/using_PrivateMethodTester + val internalGetHandler = PrivateMethod[BatchApiResponse.StatusQueried](Symbol("internalGetHandler")) + val result = batchRequestExecutor invokePrivate internalGetHandler(mockClient, GetJobRequest.newBuilder().build()) + + // Verify the instantiatedVmInfo + result.status match { + case RunStatus.Running(_, Some(instantiatedVmInfo)) => + instantiatedVmInfo.region shouldBe "us-central1" + instantiatedVmInfo.machineType shouldBe "n1-standard-1" + instantiatedVmInfo.preemptible shouldBe true + case _ => fail("Expected RunStatus.Running with instantiatedVmInfo") + } + } + + it should "send vmStartTime and vmEndTime metadata info when a workflow succeeds" in { + + val mockClient = setupBatchClient() + + // Create the BatchRequestExecutor + val batchRequestExecutor = new BatchRequestExecutor.CloudImpl(BatchServiceSettings.newBuilder().build()) + + // testing a private method see https://www.scalatest.org/user_guide/using_PrivateMethodTester + val internalGetHandler = PrivateMethod[BatchApiResponse.StatusQueried](Symbol("internalGetHandler")) + val result = batchRequestExecutor invokePrivate internalGetHandler(mockClient, GetJobRequest.newBuilder().build()) + + // Verify the events + result.status match { + case RunStatus.Success(events, _) => + val eventNames = events.map(_.name) + val eventTimes = events.map(_.offsetDateTime.toString) + eventNames should contain theSameElementsAs List("vmStartTime", "vmEndTime") + eventTimes should contain theSameElementsAs List("1970-01-01T00:00:01Z", "1970-01-01T00:00:02Z") + case _ => fail("Expected RunStatus.Success with events") + } + } + + it should "send vmStartTime and vmEndTime metadata info when a workflow fails" in { + val mockClient = setupBatchClient(jobState = JobStatus.State.FAILED) + + // Create the BatchRequestExecutor + val batchRequestExecutor = new BatchRequestExecutor.CloudImpl(BatchServiceSettings.newBuilder().build()) + + // testing a private method see https://www.scalatest.org/user_guide/using_PrivateMethodTester + val internalGetHandler = PrivateMethod[BatchApiResponse.StatusQueried](Symbol("internalGetHandler")) + val result = batchRequestExecutor invokePrivate internalGetHandler(mockClient, GetJobRequest.newBuilder().build()) + + // Verify the events + result.status match { + case RunStatus.Failed(_, events, _) => + val eventNames = events.map(_.name) + val eventTimes = events.map(_.offsetDateTime.toString) + eventNames should contain theSameElementsAs List("vmStartTime", "vmEndTime") + eventTimes should contain theSameElementsAs List("1970-01-01T00:00:01Z", "1970-01-01T00:00:02Z") + case _ => fail("Expected RunStatus.Success with events") + } + } + +}