Skip to content

Commit

Permalink
Fix warning on startup [no JIRA] (#5570)
Browse files Browse the repository at this point in the history
  • Loading branch information
aednichols authored Jul 10, 2020
1 parent 017de78 commit 381a54e
Show file tree
Hide file tree
Showing 8 changed files with 39 additions and 23 deletions.
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,18 @@ Information on how to properly use the Singularity cache with Cromwell is now
provided in the [Cromwell Singularity documentation](
https://cromwell.readthedocs.io/en/stable/tutorials/Containers/#singularity).

### Google library upgrade [(#5565)](https://github.com/broadinstitute/cromwell/pull/5565)

All previous versions of Cromwell shipped with Google Cloud Storage (GCS) libraries that are now deprecated and will [stop working in August 2020](https://developers.googleblog.com/2018/03/discontinuing-support-for-json-rpc-and.html). This release adopts updated libraries to ensure uninterrupted operation. The only user action required is upgrading Cromwell.

### Bug fixes

* Fixed a bug that required Cromwell to be restarted in order to pick up DNS changes.
* By default, the JVM caches DNS records with a TTL of infinity.
* Cromwell now configures its JVM with a 3-minute TTL. This value can be customized by setting `system.dns-cache-ttl`.
* Clarified an error message that Cromwell emits when the compute backend terminates a job of its own volition (as opposed to termination in response to an abort request from Cromwell)
* Previously, the error read `The job was aborted from outside Cromwell`
* The new error reads `The compute backend terminated the job. If this termination is unexpected, examine likely causes such as preemption, running out of disk or memory on the compute instance, or exceeding the backend's maximum job duration.`

## 51 Release Notes

Expand Down
9 changes: 5 additions & 4 deletions engine/src/main/scala/cromwell/engine/io/IoActor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ final class IoActor(queueSize: Int,
nioParallelism: Int,
gcsParallelism: Int,
throttle: Option[Throttle],
override val serviceRegistryActor: ActorRef)(implicit val materializer: ActorMaterializer)
override val serviceRegistryActor: ActorRef,
applicationName: String)(implicit val materializer: ActorMaterializer)
extends Actor with ActorLogging with StreamActorHelper[IoCommandContext[_]] with IoInstrumentation with Timers {
implicit val ec = context.dispatcher

Expand All @@ -56,7 +57,7 @@ final class IoActor(queueSize: Int,
}

private [io] lazy val defaultFlow = new NioFlow(parallelism = nioParallelism, context.system.scheduler, onRetry).flow.withAttributes(ActorAttributes.dispatcher(Dispatcher.IoDispatcher))
private [io] lazy val gcsBatchFlow = new ParallelGcsBatchFlow(parallelism = gcsParallelism, batchSize = 100, context.system.scheduler, onRetry).flow.withAttributes(ActorAttributes.dispatcher(Dispatcher.IoDispatcher))
private [io] lazy val gcsBatchFlow = new ParallelGcsBatchFlow(parallelism = gcsParallelism, batchSize = 100, context.system.scheduler, onRetry, applicationName).flow.withAttributes(ActorAttributes.dispatcher(Dispatcher.IoDispatcher))

protected val source = Source.queue[IoCommandContext[_]](queueSize, OverflowStrategy.dropNew)

Expand Down Expand Up @@ -232,7 +233,7 @@ object IoActor {

def isFatal(failure: Throwable) = !isRetryable(failure)

def props(queueSize: Int, nioParallelism: Int, gcsParallelism: Int, throttle: Option[Throttle], serviceRegistryActor: ActorRef)(implicit materializer: ActorMaterializer) = {
Props(new IoActor(queueSize, nioParallelism, gcsParallelism, throttle, serviceRegistryActor)).withDispatcher(IoDispatcher)
def props(queueSize: Int, nioParallelism: Int, gcsParallelism: Int, throttle: Option[Throttle], serviceRegistryActor: ActorRef, applicationName: String)(implicit materializer: ActorMaterializer) = {
Props(new IoActor(queueSize, nioParallelism, gcsParallelism, throttle, serviceRegistryActor, applicationName)).withDispatcher(IoDispatcher)
}
}
11 changes: 8 additions & 3 deletions engine/src/main/scala/cromwell/engine/io/gcs/GcsBatchFlow.scala
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ object GcsBatchFlow {
}
}

class GcsBatchFlow(batchSize: Int, scheduler: Scheduler, onRetry: IoCommandContext[_] => Throwable => Unit)(implicit ec: ExecutionContext) {
class GcsBatchFlow(batchSize: Int, scheduler: Scheduler, onRetry: IoCommandContext[_] => Throwable => Unit, applicationName: String)(implicit ec: ExecutionContext) {

// Does not carry any authentication, assumes all underlying requests are properly authenticated
private val httpRequestInitializer = new HttpRequestInitializer {
Expand All @@ -51,8 +51,13 @@ class GcsBatchFlow(batchSize: Int, scheduler: Scheduler, onRetry: IoCommandConte
}

private val batchRequest: BatchRequest = {
val storage = new Storage(GcsStorage.HttpTransport, JacksonFactory.getDefaultInstance, httpRequestInitializer)
storage.batch()
val storage = new Storage.Builder(
GcsStorage.HttpTransport,
JacksonFactory.getDefaultInstance,
httpRequestInitializer
).setApplicationName(applicationName)

storage.build().batch()
}

val flow = GraphDSL.create() { implicit builder =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,15 @@ import scala.concurrent.ExecutionContext
/**
* Balancer that distributes requests to multiple batch flows in parallel
*/
class ParallelGcsBatchFlow(parallelism: Int, batchSize: Int, scheduler: Scheduler, onRetry: IoCommandContext[_] => Throwable => Unit)(implicit ec: ExecutionContext) {
class ParallelGcsBatchFlow(parallelism: Int, batchSize: Int, scheduler: Scheduler, onRetry: IoCommandContext[_] => Throwable => Unit, applicationName: String)(implicit ec: ExecutionContext) {

val flow = GraphDSL.create() { implicit builder =>
import GraphDSL.Implicits._
val balancer = builder.add(Balance[GcsBatchCommandContext[_, _]](parallelism, waitForAllDownstreams = false))
val merge = builder.add(Merge[IoResult](parallelism))

for (_ <- 1 to parallelism) {
val workerFlow = new GcsBatchFlow(batchSize, scheduler, onRetry).flow
val workerFlow = new GcsBatchFlow(batchSize, scheduler, onRetry, applicationName).flow
// for each worker, add an edge from the balancer to the worker, then wire
// it to the merge element
balancer ~> workerFlow.async ~> merge
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,9 @@ case class WorkflowExecutionActor(params: WorkflowExecutionActorParams)
// - The job lasted too long (eg PAPI 6 day timeout)
// Treat it like any other non-retryable failure:
case Event(JobAbortedResponse(jobKey), stateData) =>
val cause = new Exception("The job was aborted from outside Cromwell")
val cause = new Exception(
"The compute backend terminated the job. If this termination is unexpected, examine likely causes such as preemption, running out of disk or memory on the compute instance, or exceeding the backend's maximum job duration."
)
handleNonRetryableFailure(stateData, jobKey, cause)
// Sub Workflow - sub workflow failures are always non retryable
case Event(SubWorkflowFailedResponse(jobKey, descendantJobKeys, reason), stateData) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import akka.pattern.GracefulStopSupport
import akka.routing.RoundRobinPool
import akka.stream.ActorMaterializer
import com.typesafe.config.Config
import cromwell.cloudsupport.gcp.GoogleConfiguration
import cromwell.core._
import cromwell.core.actor.StreamActorHelper.ActorRestartException
import cromwell.core.filesystem.CromwellFileSystems
Expand Down Expand Up @@ -107,7 +108,7 @@ abstract class CromwellRootActor(terminator: CromwellTerminator,
lazy val nioParallelism = systemConfig.as[Option[Int]]("io.nio.parallelism").getOrElse(10)
lazy val gcsParallelism = systemConfig.as[Option[Int]]("io.gcs.parallelism").getOrElse(10)
lazy val ioThrottle = systemConfig.getAs[Throttle]("io").getOrElse(Throttle(100000, 100.seconds, 100000))
lazy val ioActor = context.actorOf(IoActor.props(LoadConfig.IoQueueSize, nioParallelism, gcsParallelism, Option(ioThrottle), serviceRegistryActor), "IoActor")
lazy val ioActor = context.actorOf(IoActor.props(LoadConfig.IoQueueSize, nioParallelism, gcsParallelism, Option(ioThrottle), serviceRegistryActor, GoogleConfiguration(config).applicationName), "IoActor")
lazy val ioActorProxy = context.actorOf(IoActorProxy.props(ioActor), "IoProxy")

// Register the IoActor with the service registry:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ class IoActorProxyGcsBatchSpec extends TestKitSuite with FlatSpecLike with Match
}

def testWith(src: GcsPath, dst: GcsPath, directory: GcsPath) = {
val testActor = TestActorRef(new IoActor(10, 10, 10, None, TestProbe().ref))
val testActor = TestActorRef(new IoActor(10, 10, 10, None, TestProbe().ref, "cromwell test"))

val copyCommand = GcsBatchCopyCommand(src, dst, overwrite = false)
val sizeCommand = GcsBatchSizeCommand(src)
Expand Down Expand Up @@ -123,7 +123,7 @@ class IoActorProxyGcsBatchSpec extends TestKitSuite with FlatSpecLike with Match
}

it should "copy files across GCS storage classes" taggedAs IntegrationTest in {
val testActor = TestActorRef(new IoActor(10, 10, 10, None, TestProbe().ref))
val testActor = TestActorRef(new IoActor(10, 10, 10, None, TestProbe().ref, "cromwell test"))

val copyCommand = GcsBatchCopyCommand(srcRegional, dstMultiRegional, overwrite = false)

Expand Down
20 changes: 10 additions & 10 deletions engine/src/test/scala/cromwell/engine/io/IoActorSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ class IoActorSpec extends TestKitSuite with FlatSpecLike with Matchers with Impl
}

it should "copy a file" in {
val testActor = TestActorRef(new IoActor(1, 10, 10, None, TestProbe().ref))
val testActor = TestActorRef(new IoActor(1, 10, 10, None, TestProbe().ref, "cromwell test"))

val src = DefaultPathBuilder.createTempFile()
val dst: Path = src.parent.resolve(src.name + "-dst")
Expand All @@ -51,7 +51,7 @@ class IoActorSpec extends TestKitSuite with FlatSpecLike with Matchers with Impl
}

it should "write to a file" in {
val testActor = TestActorRef(new IoActor(1, 10, 10, None, TestProbe().ref))
val testActor = TestActorRef(new IoActor(1, 10, 10, None, TestProbe().ref, "cromwell test"))

val src = DefaultPathBuilder.createTempFile()

Expand All @@ -68,7 +68,7 @@ class IoActorSpec extends TestKitSuite with FlatSpecLike with Matchers with Impl
}

it should "delete a file" in {
val testActor = TestActorRef(new IoActor(1, 10, 10, None, TestProbe().ref))
val testActor = TestActorRef(new IoActor(1, 10, 10, None, TestProbe().ref, "cromwell test"))

val src = DefaultPathBuilder.createTempFile()

Expand All @@ -84,7 +84,7 @@ class IoActorSpec extends TestKitSuite with FlatSpecLike with Matchers with Impl
}

it should "read a file" in {
val testActor = TestActorRef(new IoActor(1, 10, 10, None, TestProbe().ref))
val testActor = TestActorRef(new IoActor(1, 10, 10, None, TestProbe().ref, "cromwell test"))

val src = DefaultPathBuilder.createTempFile()
src.write("hello")
Expand All @@ -103,7 +103,7 @@ class IoActorSpec extends TestKitSuite with FlatSpecLike with Matchers with Impl
}

it should "read only the first bytes of file" in {
val testActor = TestActorRef(new IoActor(1, 10, 10, None, TestProbe().ref))
val testActor = TestActorRef(new IoActor(1, 10, 10, None, TestProbe().ref, "cromwell test"))

val src = DefaultPathBuilder.createTempFile()
src.write("hello")
Expand All @@ -122,7 +122,7 @@ class IoActorSpec extends TestKitSuite with FlatSpecLike with Matchers with Impl
}

it should "read the file if it's under the byte limit" in {
val testActor = TestActorRef(new IoActor(1, 10, 10, None, TestProbe().ref))
val testActor = TestActorRef(new IoActor(1, 10, 10, None, TestProbe().ref, "cromwell test"))

val src = DefaultPathBuilder.createTempFile()
src.write("hello")
Expand All @@ -141,7 +141,7 @@ class IoActorSpec extends TestKitSuite with FlatSpecLike with Matchers with Impl
}

it should "fail if the file is larger than the read limit" in {
val testActor = TestActorRef(new IoActor(1, 10, 10, None, TestProbe().ref))
val testActor = TestActorRef(new IoActor(1, 10, 10, None, TestProbe().ref, "cromwell test"))

val src = DefaultPathBuilder.createTempFile()
src.write("hello")
Expand All @@ -158,7 +158,7 @@ class IoActorSpec extends TestKitSuite with FlatSpecLike with Matchers with Impl
}

it should "return a file size" in {
val testActor = TestActorRef(new IoActor(1, 10, 10, None, TestProbe().ref))
val testActor = TestActorRef(new IoActor(1, 10, 10, None, TestProbe().ref, "cromwell test"))

val src = DefaultPathBuilder.createTempFile()
src.write("hello")
Expand All @@ -177,7 +177,7 @@ class IoActorSpec extends TestKitSuite with FlatSpecLike with Matchers with Impl
}

it should "return a file md5 hash (local)" in {
val testActor = TestActorRef(new IoActor(1, 10, 10, None, TestProbe().ref))
val testActor = TestActorRef(new IoActor(1, 10, 10, None, TestProbe().ref, "cromwell test"))

val src = DefaultPathBuilder.createTempFile()
src.write("hello")
Expand All @@ -196,7 +196,7 @@ class IoActorSpec extends TestKitSuite with FlatSpecLike with Matchers with Impl
}

it should "touch a file (local)" in {
val testActor = TestActorRef(new IoActor(1, 10, 10, None, TestProbe().ref))
val testActor = TestActorRef(new IoActor(1, 10, 10, None, TestProbe().ref, "cromwell test"))

val src = DefaultPathBuilder.createTempFile()
src.write("hello")
Expand Down

0 comments on commit 381a54e

Please sign in to comment.