diff --git a/CHANGELOG.md b/CHANGELOG.md index 96e2fed6f58..762b36ed680 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,11 +8,18 @@ Information on how to properly use the Singularity cache with Cromwell is now provided in the [Cromwell Singularity documentation]( https://cromwell.readthedocs.io/en/stable/tutorials/Containers/#singularity). +### Google library upgrade [(#5565)](https://github.com/broadinstitute/cromwell/pull/5565) + +All previous versions of Cromwell shipped with Google Cloud Storage (GCS) libraries that are now deprecated and will [stop working in August 2020](https://developers.googleblog.com/2018/03/discontinuing-support-for-json-rpc-and.html). This release adopts updated libraries to ensure uninterrupted operation. The only user action required is upgrading Cromwell. + ### Bug fixes * Fixed a bug that required Cromwell to be restarted in order to pick up DNS changes. * By default, the JVM caches DNS records with a TTL of infinity. * Cromwell now configures its JVM with a 3-minute TTL. This value can be customized by setting `system.dns-cache-ttl`. +* Clarified an error message that Cromwell emits when the compute backend terminates a job of its own volition (as opposed to termination in response to an abort request from Cromwell) + * Previously, the error read `The job was aborted from outside Cromwell` + * The new error reads `The compute backend terminated the job. If this termination is unexpected, examine likely causes such as preemption, running out of disk or memory on the compute instance, or exceeding the backend's maximum job duration.` ## 51 Release Notes diff --git a/engine/src/main/scala/cromwell/engine/io/IoActor.scala b/engine/src/main/scala/cromwell/engine/io/IoActor.scala index 8d5d079f862..e267f2ad1d2 100644 --- a/engine/src/main/scala/cromwell/engine/io/IoActor.scala +++ b/engine/src/main/scala/cromwell/engine/io/IoActor.scala @@ -37,7 +37,8 @@ final class IoActor(queueSize: Int, nioParallelism: Int, gcsParallelism: Int, throttle: Option[Throttle], - override val serviceRegistryActor: ActorRef)(implicit val materializer: ActorMaterializer) + override val serviceRegistryActor: ActorRef, + applicationName: String)(implicit val materializer: ActorMaterializer) extends Actor with ActorLogging with StreamActorHelper[IoCommandContext[_]] with IoInstrumentation with Timers { implicit val ec = context.dispatcher @@ -56,7 +57,7 @@ final class IoActor(queueSize: Int, } private [io] lazy val defaultFlow = new NioFlow(parallelism = nioParallelism, context.system.scheduler, onRetry).flow.withAttributes(ActorAttributes.dispatcher(Dispatcher.IoDispatcher)) - private [io] lazy val gcsBatchFlow = new ParallelGcsBatchFlow(parallelism = gcsParallelism, batchSize = 100, context.system.scheduler, onRetry).flow.withAttributes(ActorAttributes.dispatcher(Dispatcher.IoDispatcher)) + private [io] lazy val gcsBatchFlow = new ParallelGcsBatchFlow(parallelism = gcsParallelism, batchSize = 100, context.system.scheduler, onRetry, applicationName).flow.withAttributes(ActorAttributes.dispatcher(Dispatcher.IoDispatcher)) protected val source = Source.queue[IoCommandContext[_]](queueSize, OverflowStrategy.dropNew) @@ -232,7 +233,7 @@ object IoActor { def isFatal(failure: Throwable) = !isRetryable(failure) - def props(queueSize: Int, nioParallelism: Int, gcsParallelism: Int, throttle: Option[Throttle], serviceRegistryActor: ActorRef)(implicit materializer: ActorMaterializer) = { - Props(new IoActor(queueSize, nioParallelism, gcsParallelism, throttle, serviceRegistryActor)).withDispatcher(IoDispatcher) + def props(queueSize: Int, nioParallelism: Int, gcsParallelism: Int, throttle: Option[Throttle], serviceRegistryActor: ActorRef, applicationName: String)(implicit materializer: ActorMaterializer) = { + Props(new IoActor(queueSize, nioParallelism, gcsParallelism, throttle, serviceRegistryActor, applicationName)).withDispatcher(IoDispatcher) } } diff --git a/engine/src/main/scala/cromwell/engine/io/gcs/GcsBatchFlow.scala b/engine/src/main/scala/cromwell/engine/io/gcs/GcsBatchFlow.scala index afbd929e8af..91691bf6d6c 100644 --- a/engine/src/main/scala/cromwell/engine/io/gcs/GcsBatchFlow.scala +++ b/engine/src/main/scala/cromwell/engine/io/gcs/GcsBatchFlow.scala @@ -39,7 +39,7 @@ object GcsBatchFlow { } } -class GcsBatchFlow(batchSize: Int, scheduler: Scheduler, onRetry: IoCommandContext[_] => Throwable => Unit)(implicit ec: ExecutionContext) { +class GcsBatchFlow(batchSize: Int, scheduler: Scheduler, onRetry: IoCommandContext[_] => Throwable => Unit, applicationName: String)(implicit ec: ExecutionContext) { // Does not carry any authentication, assumes all underlying requests are properly authenticated private val httpRequestInitializer = new HttpRequestInitializer { @@ -51,8 +51,13 @@ class GcsBatchFlow(batchSize: Int, scheduler: Scheduler, onRetry: IoCommandConte } private val batchRequest: BatchRequest = { - val storage = new Storage(GcsStorage.HttpTransport, JacksonFactory.getDefaultInstance, httpRequestInitializer) - storage.batch() + val storage = new Storage.Builder( + GcsStorage.HttpTransport, + JacksonFactory.getDefaultInstance, + httpRequestInitializer + ).setApplicationName(applicationName) + + storage.build().batch() } val flow = GraphDSL.create() { implicit builder => diff --git a/engine/src/main/scala/cromwell/engine/io/gcs/ParallelGcsBatchFlow.scala b/engine/src/main/scala/cromwell/engine/io/gcs/ParallelGcsBatchFlow.scala index eb339369ae0..ad6ea4671eb 100644 --- a/engine/src/main/scala/cromwell/engine/io/gcs/ParallelGcsBatchFlow.scala +++ b/engine/src/main/scala/cromwell/engine/io/gcs/ParallelGcsBatchFlow.scala @@ -11,7 +11,7 @@ import scala.concurrent.ExecutionContext /** * Balancer that distributes requests to multiple batch flows in parallel */ -class ParallelGcsBatchFlow(parallelism: Int, batchSize: Int, scheduler: Scheduler, onRetry: IoCommandContext[_] => Throwable => Unit)(implicit ec: ExecutionContext) { +class ParallelGcsBatchFlow(parallelism: Int, batchSize: Int, scheduler: Scheduler, onRetry: IoCommandContext[_] => Throwable => Unit, applicationName: String)(implicit ec: ExecutionContext) { val flow = GraphDSL.create() { implicit builder => import GraphDSL.Implicits._ @@ -19,7 +19,7 @@ class ParallelGcsBatchFlow(parallelism: Int, batchSize: Int, scheduler: Schedule val merge = builder.add(Merge[IoResult](parallelism)) for (_ <- 1 to parallelism) { - val workerFlow = new GcsBatchFlow(batchSize, scheduler, onRetry).flow + val workerFlow = new GcsBatchFlow(batchSize, scheduler, onRetry, applicationName).flow // for each worker, add an edge from the balancer to the worker, then wire // it to the merge element balancer ~> workerFlow.async ~> merge diff --git a/engine/src/main/scala/cromwell/engine/workflow/lifecycle/execution/WorkflowExecutionActor.scala b/engine/src/main/scala/cromwell/engine/workflow/lifecycle/execution/WorkflowExecutionActor.scala index 23efde68e4b..cf84036cd9c 100644 --- a/engine/src/main/scala/cromwell/engine/workflow/lifecycle/execution/WorkflowExecutionActor.scala +++ b/engine/src/main/scala/cromwell/engine/workflow/lifecycle/execution/WorkflowExecutionActor.scala @@ -251,7 +251,9 @@ case class WorkflowExecutionActor(params: WorkflowExecutionActorParams) // - The job lasted too long (eg PAPI 6 day timeout) // Treat it like any other non-retryable failure: case Event(JobAbortedResponse(jobKey), stateData) => - val cause = new Exception("The job was aborted from outside Cromwell") + val cause = new Exception( + "The compute backend terminated the job. If this termination is unexpected, examine likely causes such as preemption, running out of disk or memory on the compute instance, or exceeding the backend's maximum job duration." + ) handleNonRetryableFailure(stateData, jobKey, cause) // Sub Workflow - sub workflow failures are always non retryable case Event(SubWorkflowFailedResponse(jobKey, descendantJobKeys, reason), stateData) => diff --git a/engine/src/main/scala/cromwell/server/CromwellRootActor.scala b/engine/src/main/scala/cromwell/server/CromwellRootActor.scala index 94265b0b562..89fd813305f 100644 --- a/engine/src/main/scala/cromwell/server/CromwellRootActor.scala +++ b/engine/src/main/scala/cromwell/server/CromwellRootActor.scala @@ -7,6 +7,7 @@ import akka.pattern.GracefulStopSupport import akka.routing.RoundRobinPool import akka.stream.ActorMaterializer import com.typesafe.config.Config +import cromwell.cloudsupport.gcp.GoogleConfiguration import cromwell.core._ import cromwell.core.actor.StreamActorHelper.ActorRestartException import cromwell.core.filesystem.CromwellFileSystems @@ -107,7 +108,7 @@ abstract class CromwellRootActor(terminator: CromwellTerminator, lazy val nioParallelism = systemConfig.as[Option[Int]]("io.nio.parallelism").getOrElse(10) lazy val gcsParallelism = systemConfig.as[Option[Int]]("io.gcs.parallelism").getOrElse(10) lazy val ioThrottle = systemConfig.getAs[Throttle]("io").getOrElse(Throttle(100000, 100.seconds, 100000)) - lazy val ioActor = context.actorOf(IoActor.props(LoadConfig.IoQueueSize, nioParallelism, gcsParallelism, Option(ioThrottle), serviceRegistryActor), "IoActor") + lazy val ioActor = context.actorOf(IoActor.props(LoadConfig.IoQueueSize, nioParallelism, gcsParallelism, Option(ioThrottle), serviceRegistryActor, GoogleConfiguration(config).applicationName), "IoActor") lazy val ioActorProxy = context.actorOf(IoActorProxy.props(ioActor), "IoProxy") // Register the IoActor with the service registry: diff --git a/engine/src/test/scala/cromwell/engine/io/IoActorProxyGcsBatchSpec.scala b/engine/src/test/scala/cromwell/engine/io/IoActorProxyGcsBatchSpec.scala index 92b85f2a0a3..6a8a381861a 100644 --- a/engine/src/test/scala/cromwell/engine/io/IoActorProxyGcsBatchSpec.scala +++ b/engine/src/test/scala/cromwell/engine/io/IoActorProxyGcsBatchSpec.scala @@ -62,7 +62,7 @@ class IoActorProxyGcsBatchSpec extends TestKitSuite with FlatSpecLike with Match } def testWith(src: GcsPath, dst: GcsPath, directory: GcsPath) = { - val testActor = TestActorRef(new IoActor(10, 10, 10, None, TestProbe().ref)) + val testActor = TestActorRef(new IoActor(10, 10, 10, None, TestProbe().ref, "cromwell test")) val copyCommand = GcsBatchCopyCommand(src, dst, overwrite = false) val sizeCommand = GcsBatchSizeCommand(src) @@ -123,7 +123,7 @@ class IoActorProxyGcsBatchSpec extends TestKitSuite with FlatSpecLike with Match } it should "copy files across GCS storage classes" taggedAs IntegrationTest in { - val testActor = TestActorRef(new IoActor(10, 10, 10, None, TestProbe().ref)) + val testActor = TestActorRef(new IoActor(10, 10, 10, None, TestProbe().ref, "cromwell test")) val copyCommand = GcsBatchCopyCommand(srcRegional, dstMultiRegional, overwrite = false) diff --git a/engine/src/test/scala/cromwell/engine/io/IoActorSpec.scala b/engine/src/test/scala/cromwell/engine/io/IoActorSpec.scala index 0b7f4f88b84..9dc21c4ac6c 100644 --- a/engine/src/test/scala/cromwell/engine/io/IoActorSpec.scala +++ b/engine/src/test/scala/cromwell/engine/io/IoActorSpec.scala @@ -32,7 +32,7 @@ class IoActorSpec extends TestKitSuite with FlatSpecLike with Matchers with Impl } it should "copy a file" in { - val testActor = TestActorRef(new IoActor(1, 10, 10, None, TestProbe().ref)) + val testActor = TestActorRef(new IoActor(1, 10, 10, None, TestProbe().ref, "cromwell test")) val src = DefaultPathBuilder.createTempFile() val dst: Path = src.parent.resolve(src.name + "-dst") @@ -51,7 +51,7 @@ class IoActorSpec extends TestKitSuite with FlatSpecLike with Matchers with Impl } it should "write to a file" in { - val testActor = TestActorRef(new IoActor(1, 10, 10, None, TestProbe().ref)) + val testActor = TestActorRef(new IoActor(1, 10, 10, None, TestProbe().ref, "cromwell test")) val src = DefaultPathBuilder.createTempFile() @@ -68,7 +68,7 @@ class IoActorSpec extends TestKitSuite with FlatSpecLike with Matchers with Impl } it should "delete a file" in { - val testActor = TestActorRef(new IoActor(1, 10, 10, None, TestProbe().ref)) + val testActor = TestActorRef(new IoActor(1, 10, 10, None, TestProbe().ref, "cromwell test")) val src = DefaultPathBuilder.createTempFile() @@ -84,7 +84,7 @@ class IoActorSpec extends TestKitSuite with FlatSpecLike with Matchers with Impl } it should "read a file" in { - val testActor = TestActorRef(new IoActor(1, 10, 10, None, TestProbe().ref)) + val testActor = TestActorRef(new IoActor(1, 10, 10, None, TestProbe().ref, "cromwell test")) val src = DefaultPathBuilder.createTempFile() src.write("hello") @@ -103,7 +103,7 @@ class IoActorSpec extends TestKitSuite with FlatSpecLike with Matchers with Impl } it should "read only the first bytes of file" in { - val testActor = TestActorRef(new IoActor(1, 10, 10, None, TestProbe().ref)) + val testActor = TestActorRef(new IoActor(1, 10, 10, None, TestProbe().ref, "cromwell test")) val src = DefaultPathBuilder.createTempFile() src.write("hello") @@ -122,7 +122,7 @@ class IoActorSpec extends TestKitSuite with FlatSpecLike with Matchers with Impl } it should "read the file if it's under the byte limit" in { - val testActor = TestActorRef(new IoActor(1, 10, 10, None, TestProbe().ref)) + val testActor = TestActorRef(new IoActor(1, 10, 10, None, TestProbe().ref, "cromwell test")) val src = DefaultPathBuilder.createTempFile() src.write("hello") @@ -141,7 +141,7 @@ class IoActorSpec extends TestKitSuite with FlatSpecLike with Matchers with Impl } it should "fail if the file is larger than the read limit" in { - val testActor = TestActorRef(new IoActor(1, 10, 10, None, TestProbe().ref)) + val testActor = TestActorRef(new IoActor(1, 10, 10, None, TestProbe().ref, "cromwell test")) val src = DefaultPathBuilder.createTempFile() src.write("hello") @@ -158,7 +158,7 @@ class IoActorSpec extends TestKitSuite with FlatSpecLike with Matchers with Impl } it should "return a file size" in { - val testActor = TestActorRef(new IoActor(1, 10, 10, None, TestProbe().ref)) + val testActor = TestActorRef(new IoActor(1, 10, 10, None, TestProbe().ref, "cromwell test")) val src = DefaultPathBuilder.createTempFile() src.write("hello") @@ -177,7 +177,7 @@ class IoActorSpec extends TestKitSuite with FlatSpecLike with Matchers with Impl } it should "return a file md5 hash (local)" in { - val testActor = TestActorRef(new IoActor(1, 10, 10, None, TestProbe().ref)) + val testActor = TestActorRef(new IoActor(1, 10, 10, None, TestProbe().ref, "cromwell test")) val src = DefaultPathBuilder.createTempFile() src.write("hello") @@ -196,7 +196,7 @@ class IoActorSpec extends TestKitSuite with FlatSpecLike with Matchers with Impl } it should "touch a file (local)" in { - val testActor = TestActorRef(new IoActor(1, 10, 10, None, TestProbe().ref)) + val testActor = TestActorRef(new IoActor(1, 10, 10, None, TestProbe().ref, "cromwell test")) val src = DefaultPathBuilder.createTempFile() src.write("hello")