diff --git a/filesystems/blob/src/main/scala/cromwell/filesystems/blob/BlobPathBuilder.scala b/filesystems/blob/src/main/scala/cromwell/filesystems/blob/BlobPathBuilder.scala index 1906cc2f911..cc0e112806a 100644 --- a/filesystems/blob/src/main/scala/cromwell/filesystems/blob/BlobPathBuilder.scala +++ b/filesystems/blob/src/main/scala/cromwell/filesystems/blob/BlobPathBuilder.scala @@ -68,14 +68,22 @@ class BlobPathBuilder(container: BlobContainerName, endpoint: EndpointURL)(priva } object BlobPath { - // The Azure NIO library uses `{containerName}:` as the root of the path. - // This doesn't work well for our need to easily transfer back and forth - // to and from the blob URL format. This method removes anything up to and including - // the first colon, to create a path string useful for working with BlobPath. - // This is safe because the NIO library enforces no colons except to mark - // the root container name. - private def nioPathString(nioPath: NioPath): String = { - val pathStr = nioPath.toString + // The Azure NIO library uses `{containerName}:` as the root of the path (treating the blob container within + // the storage account similarly to a drive within a computer). This doesn't work well for our need to easily + // transfer back and forth to and from the blob URL format. It also causes the library to garble full http:// + // paths that it receives (it interprets `http` as the container name); it transforms them to http:/ + // + // We transform these library-generated paths in two steps: + // 1) If the path starts with http:/ (single slash!) transform it to the containerName: + // format the library expects + // 2) If the path looks like :, strip off the : to leave the absolute path inside the container. + private val brokenPathRegex = "https:/([a-z0-9]+).blob.core.windows.net/([-a-zA-Z0-9]+)/(.*)".r + def cleanedNioPathString(nioString: String): String = { + val pathStr = nioString match { + case brokenPathRegex(_, containerName, pathInContainer) => + s"${containerName}:/${pathInContainer}" + case _ => nioString + } pathStr.substring(pathStr.indexOf(":")+1) } @@ -83,7 +91,7 @@ object BlobPath { endpoint: EndpointURL, container: BlobContainerName, fsm: BlobFileSystemManager): BlobPath = { - BlobPath(nioPathString(nioPath), endpoint, container)(fsm) + BlobPath(cleanedNioPathString(nioPath.toString), endpoint, container)(fsm) } } @@ -95,7 +103,7 @@ case class BlobPath private[blob](pathString: String, endpoint: EndpointURL, con override def pathAsString: String = List(endpoint, container, pathString.stripPrefix("/")).mkString("/") //This is purposefully an unprotected get because if the endpoint cannot be parsed this should fail loudly rather than quietly - override def pathWithoutScheme: String = parseURI(endpoint.value).map(u => List(u.getHost, container, pathString).mkString("/")).get + override def pathWithoutScheme: String = parseURI(endpoint.value).map(u => List(u.getHost, container, pathString.stripPrefix("/")).mkString("/")).get private def findNioPath(path: String): NioPath = (for { fileSystem <- fsm.retrieveFilesystem() diff --git a/filesystems/blob/src/test/scala/cromwell/filesystems/blob/BlobPathBuilderSpec.scala b/filesystems/blob/src/test/scala/cromwell/filesystems/blob/BlobPathBuilderSpec.scala index 2aa9b2b43aa..11ffc11354e 100644 --- a/filesystems/blob/src/test/scala/cromwell/filesystems/blob/BlobPathBuilderSpec.scala +++ b/filesystems/blob/src/test/scala/cromwell/filesystems/blob/BlobPathBuilderSpec.scala @@ -4,6 +4,7 @@ import org.mockito.Mockito.when import org.scalatest.flatspec.AnyFlatSpec import org.scalatest.matchers.should.Matchers +import java.util.UUID import scala.util.{Failure, Try} object BlobPathBuilderSpec { @@ -57,12 +58,59 @@ class BlobPathBuilderSpec extends AnyFlatSpec with Matchers with MockSugar { testException should contain(exception) } + private def testBlobNioStringCleaning(input: String, expected: String) = + BlobPath.cleanedNioPathString(input) shouldBe expected + + it should "clean the NIO path string when it has a garbled http protocol" in { + testBlobNioStringCleaning( + "https:/lz43.blob.core.windows.net/sc-ebda3e/workspace-services/cbas/terra-app-4628d0e1/test_all_engine_functions/4bb6a0a2-3b07/call-run_read_string/execution/stdout", + "/workspace-services/cbas/terra-app-4628d0e1/test_all_engine_functions/4bb6a0a2-3b07/call-run_read_string/execution/stdout" + ) + } + + it should "clean the NIO path string when it has a container name with colon prefix" in { + testBlobNioStringCleaning( + "sc-ebda3e:/workspace-services/cbas/terra-app-4628d0e1/test_all_engine_functions/4bb6a0a2-3b07/call-run_read_string/execution/stdout", + "/workspace-services/cbas/terra-app-4628d0e1/test_all_engine_functions/4bb6a0a2-3b07/call-run_read_string/execution/stdout" + ) + } + + it should "clean the NIO path string when it's an in-container absolute path" in { + testBlobNioStringCleaning( + "/workspace-services/cbas/terra-app-4628d0e1/test_all_engine_functions/4bb6a0a2-3b07/call-run_read_string/execution/stdout", + "/workspace-services/cbas/terra-app-4628d0e1/test_all_engine_functions/4bb6a0a2-3b07/call-run_read_string/execution/stdout" + ) + } + + it should "clean the NIO path string when it's the root directory only" in { + testBlobNioStringCleaning( + "sc-ebda3e:", + "" + ) + } + + //// The below tests are IGNORED because they depend on Azure auth information being present in the environment //// + val subscriptionId = SubscriptionId(UUID.fromString("62b22893-6bc1-46d9-8a90-806bb3cce3c9")) + + ignore should "resolve an absolute path string correctly to a path" in { + val endpoint = BlobPathBuilderSpec.buildEndpoint("coaexternalstorage") + val store = BlobContainerName("inputs") + val blobTokenGenerator = NativeBlobSasTokenGenerator(store, endpoint, Some(subscriptionId)) + val fsm: BlobFileSystemManager = new BlobFileSystemManager(store, endpoint, 10, blobTokenGenerator) + + val rootString = s"${endpoint.value}/${store.value}/cromwell-execution" + val blobRoot: BlobPath = new BlobPathBuilder(store, endpoint)(fsm) build rootString getOrElse fail() + blobRoot.toAbsolutePath.pathAsString should equal ("https://coaexternalstorage.blob.core.windows.net/inputs/cromwell-execution") + val otherFile = blobRoot.resolve("https://coaexternalstorage.blob.core.windows.net/inputs/cromwell-execution/test/inputFile.txt") + otherFile.toAbsolutePath.pathAsString should equal ("https://coaexternalstorage.blob.core.windows.net/inputs/cromwell-execution/test/inputFile.txt") + } + ignore should "build a blob path from a test string and read a file" in { val endpoint = BlobPathBuilderSpec.buildEndpoint("coaexternalstorage") val endpointHost = BlobPathBuilder.parseURI(endpoint.value).map(_.getHost).getOrElse(fail("Could not parse URI")) val store = BlobContainerName("inputs") val evalPath = "/test/inputFile.txt" - val blobTokenGenerator = NativeBlobSasTokenGenerator(store, endpoint) + val blobTokenGenerator = NativeBlobSasTokenGenerator(store, endpoint, Some(subscriptionId)) val fsm: BlobFileSystemManager = new BlobFileSystemManager(store, endpoint, 10L, blobTokenGenerator) val testString = endpoint.value + "/" + store + evalPath val blobPath: BlobPath = new BlobPathBuilder(store, endpoint)(fsm) build testString getOrElse fail() @@ -80,7 +128,7 @@ class BlobPathBuilderSpec extends AnyFlatSpec with Matchers with MockSugar { val endpoint = BlobPathBuilderSpec.buildEndpoint("coaexternalstorage") val store = BlobContainerName("inputs") val evalPath = "/test/inputFile.txt" - val blobTokenGenerator = NativeBlobSasTokenGenerator(store, endpoint) + val blobTokenGenerator = NativeBlobSasTokenGenerator(store, endpoint, Some(subscriptionId)) val fsm: BlobFileSystemManager = new BlobFileSystemManager(store, endpoint, 10, blobTokenGenerator) val testString = endpoint.value + "/" + store + evalPath val blobPath1: BlobPath = new BlobPathBuilder(store, endpoint)(fsm) build testString getOrElse fail() @@ -95,7 +143,7 @@ class BlobPathBuilderSpec extends AnyFlatSpec with Matchers with MockSugar { ignore should "resolve a path without duplicating container name" in { val endpoint = BlobPathBuilderSpec.buildEndpoint("coaexternalstorage") val store = BlobContainerName("inputs") - val blobTokenGenerator = NativeBlobSasTokenGenerator(store, endpoint) + val blobTokenGenerator = NativeBlobSasTokenGenerator(store, endpoint, Some(subscriptionId)) val fsm: BlobFileSystemManager = new BlobFileSystemManager(store, endpoint, 10, blobTokenGenerator) val rootString = s"${endpoint.value}/${store.value}/cromwell-execution" diff --git a/supportedBackends/tes/src/main/scala/cromwell/backend/impl/tes/TesAsyncBackendJobExecutionActor.scala b/supportedBackends/tes/src/main/scala/cromwell/backend/impl/tes/TesAsyncBackendJobExecutionActor.scala index f4259de90d8..0582997eef8 100644 --- a/supportedBackends/tes/src/main/scala/cromwell/backend/impl/tes/TesAsyncBackendJobExecutionActor.scala +++ b/supportedBackends/tes/src/main/scala/cromwell/backend/impl/tes/TesAsyncBackendJobExecutionActor.scala @@ -1,12 +1,14 @@ package cromwell.backend.impl.tes import common.exception.AggregatedMessageException + import java.io.FileNotFoundException import java.nio.file.FileAlreadyExistsException import cats.syntax.apply._ import akka.http.scaladsl.Http import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._ import akka.http.scaladsl.marshalling.Marshal +import akka.http.scaladsl.model.HttpHeader.ParsingResult.Ok import akka.http.scaladsl.model._ import akka.http.scaladsl.unmarshalling.{Unmarshal, Unmarshaller} import akka.stream.ActorMaterializer @@ -295,9 +297,18 @@ class TesAsyncBackendJobExecutionActor(override val standardParams: StandardAsyn } } + // Headers that should be included with all requests to the TES server + private def requestHeaders: List[HttpHeader] = + tesConfiguration.token.flatMap { t => + HttpHeader.parse("Authorization", t) match { + case Ok(header, _) => Some(header) + case _ => None + } + }.toList + private def makeRequest[A](request: HttpRequest)(implicit um: Unmarshaller[ResponseEntity, A]): Future[A] = { for { - response <- withRetry(() => Http().singleRequest(request)) + response <- withRetry(() => Http().singleRequest(request.withHeaders(requestHeaders))) data <- if (response.status.isFailure()) { response.entity.dataBytes.runFold(ByteString(""))(_ ++ _).map(_.utf8String) flatMap { errorBody => Future.failed(new RuntimeException(s"Failed TES request: Code ${response.status.intValue()}, Body = $errorBody")) diff --git a/supportedBackends/tes/src/main/scala/cromwell/backend/impl/tes/TesConfiguration.scala b/supportedBackends/tes/src/main/scala/cromwell/backend/impl/tes/TesConfiguration.scala index 7c7809d127f..253f60114be 100644 --- a/supportedBackends/tes/src/main/scala/cromwell/backend/impl/tes/TesConfiguration.scala +++ b/supportedBackends/tes/src/main/scala/cromwell/backend/impl/tes/TesConfiguration.scala @@ -32,6 +32,15 @@ class TesConfiguration(val configurationDescriptor: BackendConfigurationDescript .map(SimpleExponentialBackoff(_)) .getOrElse(TesConfiguration.defaultExecOrRecoverBackoff) + // Used for testing only. Include a bearer token for authenticating with the TES server + final val bearerPrefix: String = "Bearer " + val token: Option[String] = { + configurationDescriptor.backendConfig.as[Option[String]]("bearer-token").map { t => + if (!t.startsWith(bearerPrefix)) + s"${bearerPrefix}${t}" + else t + } + } } object TesConfiguration {