Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WX-1105 Fix interpretation of full http blob paths #7138

Merged
merged 19 commits into from
May 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -68,22 +68,30 @@ class BlobPathBuilder(container: BlobContainerName, endpoint: EndpointURL)(priva
}

object BlobPath {
// The Azure NIO library uses `{containerName}:` as the root of the path.
// This doesn't work well for our need to easily transfer back and forth
// to and from the blob URL format. This method removes anything up to and including
// the first colon, to create a path string useful for working with BlobPath.
// This is safe because the NIO library enforces no colons except to mark
// the root container name.
private def nioPathString(nioPath: NioPath): String = {
val pathStr = nioPath.toString
// The Azure NIO library uses `{containerName}:` as the root of the path (treating the blob container within
// the storage account similarly to a drive within a computer). This doesn't work well for our need to easily
// transfer back and forth to and from the blob URL format. It also causes the library to garble full http://
// paths that it receives (it interprets `http` as the container name); it transforms them to http:/<remainder of path>
//
// We transform these library-generated paths in two steps:
// 1) If the path starts with http:/ (single slash!) transform it to the containerName:<path inside container>
// format the library expects
// 2) If the path looks like <container>:<path>, strip off the <container>: to leave the absolute path inside the container.
private val brokenPathRegex = "https:/([a-z0-9]+).blob.core.windows.net/([-a-zA-Z0-9]+)/(.*)".r
def cleanedNioPathString(nioString: String): String = {
val pathStr = nioString match {
case brokenPathRegex(_, containerName, pathInContainer) =>
s"${containerName}:/${pathInContainer}"
case _ => nioString
}
pathStr.substring(pathStr.indexOf(":")+1)
}

def apply(nioPath: NioPath,
endpoint: EndpointURL,
container: BlobContainerName,
fsm: BlobFileSystemManager): BlobPath = {
BlobPath(nioPathString(nioPath), endpoint, container)(fsm)
BlobPath(cleanedNioPathString(nioPath.toString), endpoint, container)(fsm)
}
}

Expand All @@ -95,7 +103,7 @@ case class BlobPath private[blob](pathString: String, endpoint: EndpointURL, con
override def pathAsString: String = List(endpoint, container, pathString.stripPrefix("/")).mkString("/")

//This is purposefully an unprotected get because if the endpoint cannot be parsed this should fail loudly rather than quietly
override def pathWithoutScheme: String = parseURI(endpoint.value).map(u => List(u.getHost, container, pathString).mkString("/")).get
override def pathWithoutScheme: String = parseURI(endpoint.value).map(u => List(u.getHost, container, pathString.stripPrefix("/")).mkString("/")).get

private def findNioPath(path: String): NioPath = (for {
fileSystem <- fsm.retrieveFilesystem()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import org.mockito.Mockito.when
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers

import java.util.UUID
import scala.util.{Failure, Try}

object BlobPathBuilderSpec {
Expand Down Expand Up @@ -57,12 +58,59 @@ class BlobPathBuilderSpec extends AnyFlatSpec with Matchers with MockSugar {
testException should contain(exception)
}

private def testBlobNioStringCleaning(input: String, expected: String) =
BlobPath.cleanedNioPathString(input) shouldBe expected

it should "clean the NIO path string when it has a garbled http protocol" in {
testBlobNioStringCleaning(
"https:/lz43.blob.core.windows.net/sc-ebda3e/workspace-services/cbas/terra-app-4628d0e1/test_all_engine_functions/4bb6a0a2-3b07/call-run_read_string/execution/stdout",
"/workspace-services/cbas/terra-app-4628d0e1/test_all_engine_functions/4bb6a0a2-3b07/call-run_read_string/execution/stdout"
)
}

it should "clean the NIO path string when it has a container name with colon prefix" in {
testBlobNioStringCleaning(
"sc-ebda3e:/workspace-services/cbas/terra-app-4628d0e1/test_all_engine_functions/4bb6a0a2-3b07/call-run_read_string/execution/stdout",
"/workspace-services/cbas/terra-app-4628d0e1/test_all_engine_functions/4bb6a0a2-3b07/call-run_read_string/execution/stdout"
)
}

it should "clean the NIO path string when it's an in-container absolute path" in {
testBlobNioStringCleaning(
"/workspace-services/cbas/terra-app-4628d0e1/test_all_engine_functions/4bb6a0a2-3b07/call-run_read_string/execution/stdout",
"/workspace-services/cbas/terra-app-4628d0e1/test_all_engine_functions/4bb6a0a2-3b07/call-run_read_string/execution/stdout"
)
}

it should "clean the NIO path string when it's the root directory only" in {
testBlobNioStringCleaning(
"sc-ebda3e:",
""
)
}

//// The below tests are IGNORED because they depend on Azure auth information being present in the environment ////
val subscriptionId = SubscriptionId(UUID.fromString("62b22893-6bc1-46d9-8a90-806bb3cce3c9"))

ignore should "resolve an absolute path string correctly to a path" in {
val endpoint = BlobPathBuilderSpec.buildEndpoint("coaexternalstorage")
val store = BlobContainerName("inputs")
val blobTokenGenerator = NativeBlobSasTokenGenerator(store, endpoint, Some(subscriptionId))
val fsm: BlobFileSystemManager = new BlobFileSystemManager(store, endpoint, 10, blobTokenGenerator)

val rootString = s"${endpoint.value}/${store.value}/cromwell-execution"
val blobRoot: BlobPath = new BlobPathBuilder(store, endpoint)(fsm) build rootString getOrElse fail()
blobRoot.toAbsolutePath.pathAsString should equal ("https://coaexternalstorage.blob.core.windows.net/inputs/cromwell-execution")
val otherFile = blobRoot.resolve("https://coaexternalstorage.blob.core.windows.net/inputs/cromwell-execution/test/inputFile.txt")
otherFile.toAbsolutePath.pathAsString should equal ("https://coaexternalstorage.blob.core.windows.net/inputs/cromwell-execution/test/inputFile.txt")
}

ignore should "build a blob path from a test string and read a file" in {
val endpoint = BlobPathBuilderSpec.buildEndpoint("coaexternalstorage")
val endpointHost = BlobPathBuilder.parseURI(endpoint.value).map(_.getHost).getOrElse(fail("Could not parse URI"))
val store = BlobContainerName("inputs")
val evalPath = "/test/inputFile.txt"
val blobTokenGenerator = NativeBlobSasTokenGenerator(store, endpoint)
val blobTokenGenerator = NativeBlobSasTokenGenerator(store, endpoint, Some(subscriptionId))
val fsm: BlobFileSystemManager = new BlobFileSystemManager(store, endpoint, 10L, blobTokenGenerator)
val testString = endpoint.value + "/" + store + evalPath
val blobPath: BlobPath = new BlobPathBuilder(store, endpoint)(fsm) build testString getOrElse fail()
Expand All @@ -80,7 +128,7 @@ class BlobPathBuilderSpec extends AnyFlatSpec with Matchers with MockSugar {
val endpoint = BlobPathBuilderSpec.buildEndpoint("coaexternalstorage")
val store = BlobContainerName("inputs")
val evalPath = "/test/inputFile.txt"
val blobTokenGenerator = NativeBlobSasTokenGenerator(store, endpoint)
val blobTokenGenerator = NativeBlobSasTokenGenerator(store, endpoint, Some(subscriptionId))
val fsm: BlobFileSystemManager = new BlobFileSystemManager(store, endpoint, 10, blobTokenGenerator)
val testString = endpoint.value + "/" + store + evalPath
val blobPath1: BlobPath = new BlobPathBuilder(store, endpoint)(fsm) build testString getOrElse fail()
Expand All @@ -95,7 +143,7 @@ class BlobPathBuilderSpec extends AnyFlatSpec with Matchers with MockSugar {
ignore should "resolve a path without duplicating container name" in {
val endpoint = BlobPathBuilderSpec.buildEndpoint("coaexternalstorage")
val store = BlobContainerName("inputs")
val blobTokenGenerator = NativeBlobSasTokenGenerator(store, endpoint)
val blobTokenGenerator = NativeBlobSasTokenGenerator(store, endpoint, Some(subscriptionId))
val fsm: BlobFileSystemManager = new BlobFileSystemManager(store, endpoint, 10, blobTokenGenerator)

val rootString = s"${endpoint.value}/${store.value}/cromwell-execution"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
package cromwell.backend.impl.tes

import common.exception.AggregatedMessageException

import java.io.FileNotFoundException
import java.nio.file.FileAlreadyExistsException
import cats.syntax.apply._
import akka.http.scaladsl.Http
import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._
import akka.http.scaladsl.marshalling.Marshal
import akka.http.scaladsl.model.HttpHeader.ParsingResult.Ok
import akka.http.scaladsl.model._
import akka.http.scaladsl.unmarshalling.{Unmarshal, Unmarshaller}
import akka.stream.ActorMaterializer
Expand Down Expand Up @@ -295,9 +297,18 @@ class TesAsyncBackendJobExecutionActor(override val standardParams: StandardAsyn
}
}

// Headers that should be included with all requests to the TES server
private def requestHeaders: List[HttpHeader] =
tesConfiguration.token.flatMap { t =>
HttpHeader.parse("Authorization", t) match {
case Ok(header, _) => Some(header)
case _ => None
}
}.toList

private def makeRequest[A](request: HttpRequest)(implicit um: Unmarshaller[ResponseEntity, A]): Future[A] = {
for {
response <- withRetry(() => Http().singleRequest(request))
response <- withRetry(() => Http().singleRequest(request.withHeaders(requestHeaders)))
data <- if (response.status.isFailure()) {
response.entity.dataBytes.runFold(ByteString(""))(_ ++ _).map(_.utf8String) flatMap { errorBody =>
Future.failed(new RuntimeException(s"Failed TES request: Code ${response.status.intValue()}, Body = $errorBody"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,15 @@ class TesConfiguration(val configurationDescriptor: BackendConfigurationDescript
.map(SimpleExponentialBackoff(_))
.getOrElse(TesConfiguration.defaultExecOrRecoverBackoff)

// Used for testing only. Include a bearer token for authenticating with the TES server
final val bearerPrefix: String = "Bearer "
val token: Option[String] = {
configurationDescriptor.backendConfig.as[Option[String]]("bearer-token").map { t =>
if (!t.startsWith(bearerPrefix))
s"${bearerPrefix}${t}"
else t
}
}
}

object TesConfiguration {
Expand Down