diff --git a/build.sbt b/build.sbt index e539e5f132b..a12a94b3b82 100644 --- a/build.sbt +++ b/build.sbt @@ -90,6 +90,7 @@ lazy val azureBlobFileSystem = (project in file("filesystems/blob")) .dependsOn(core) .dependsOn(core % "test->test") .dependsOn(common % "test->test") + .dependsOn(cloudSupport) lazy val awsS3FileSystem = (project in file("filesystems/s3")) .withLibrarySettings("cromwell-aws-s3filesystem", s3FileSystemDependencies) diff --git a/centaur/src/main/resources/azureBlobTestCases/azure_blob_storage_read.test b/centaur/src/main/resources/azureBlobTestCases/azure_blob_storage_read.test index 123a2745e6d..b7270ccfb11 100644 --- a/centaur/src/main/resources/azureBlobTestCases/azure_blob_storage_read.test +++ b/centaur/src/main/resources/azureBlobTestCases/azure_blob_storage_read.test @@ -14,3 +14,11 @@ metadata { status: Succeeded "outputs.azure_blob_storage_read.s1": "This is my test file! Did it work??" } + +# az:// is the root of the container specified in reference.conf. +# Here, we verify that exactly one log was written. + +fileSystemCheck: "blob" +outputExpectations: { + "az://test-cromwell-workflow-logs/workflow.<>.log" : 1 +} diff --git a/centaur/src/main/resources/reference.conf b/centaur/src/main/resources/reference.conf index a0428e01235..d570b5beef3 100644 --- a/centaur/src/main/resources/reference.conf +++ b/centaur/src/main/resources/reference.conf @@ -86,5 +86,11 @@ centaur { include "centaur_aws_credentials.conf" } + azure { + container: "test-blob" + endpoint: "https://centaurtesting.blob.core.windows.net" + subscription: "62b22893-6bc1-46d9-8a90-806bb3cce3c9" + } + log-request-failures = false } diff --git a/centaur/src/main/scala/centaur/test/FilesChecker.scala b/centaur/src/main/scala/centaur/test/FilesChecker.scala index 240552bb755..5f3f1d9e170 100644 --- a/centaur/src/main/scala/centaur/test/FilesChecker.scala +++ b/centaur/src/main/scala/centaur/test/FilesChecker.scala @@ -41,3 +41,14 @@ object AWSFilesChecker extends FilesChecker { override def countObjectsAtPath: String => Int = s3Client.countObjects(s3PrefixRegex) } + +object BlobFilesChecker extends FilesChecker { + import ObjectCounterInstances.blobObjectCounter + import ObjectCounterSyntax._ + + private lazy val containerClient = Operations.blobContainerClient + + // The root of the endpoint + container specified in reference.conf will be substituted for az:// + private val azurePrefixRange = "^az:\\/\\/.*" + override def countObjectsAtPath: String => Int = ObjectCounterSyntax(containerClient).countObjects(azurePrefixRange) +} diff --git a/centaur/src/main/scala/centaur/test/ObjectCounter.scala b/centaur/src/main/scala/centaur/test/ObjectCounter.scala index 46affc7d552..124f78e1dc8 100644 --- a/centaur/src/main/scala/centaur/test/ObjectCounter.scala +++ b/centaur/src/main/scala/centaur/test/ObjectCounter.scala @@ -1,5 +1,6 @@ package centaur.test +import com.azure.storage.blob.BlobContainerClient import com.google.cloud.storage.Storage.BlobListOption import com.google.cloud.storage.{Blob, Storage} import software.amazon.awssdk.services.s3.S3Client @@ -38,6 +39,25 @@ object ObjectCounterInstances { storage.list(g.bucket, BlobListOption.prefix(g.directory)).iterateAll.asScala listObjectsAtPath(_).size } + + implicit val blobObjectCounter: ObjectCounter[BlobContainerClient] = (containerClient : BlobContainerClient) => { + val pathToInt: Path => Int = providedPath => { + //Our path parsing is somewhat GCP centric. Convert to a blob path starting from the container root. + def pathToBlobPath(parsedPath : Path) : String = { + (Option(parsedPath.bucket), Option(parsedPath.directory)) match { + case (None, _) => "" + case (Some(_), None) => parsedPath.bucket + case (Some(_), Some(_)) => parsedPath.bucket + "/" + parsedPath.directory + } + } + + val fullPath = pathToBlobPath(providedPath) + val blobsInFolder = containerClient.listBlobsByHierarchy(fullPath) + //if something "isPrefix", it's a directory. Otherwise, its a file. We just want to count files. + blobsInFolder.asScala.count(!_.isPrefix) + } + pathToInt(_) + } } object ObjectCounterSyntax { diff --git a/centaur/src/main/scala/centaur/test/Test.scala b/centaur/src/main/scala/centaur/test/Test.scala index 470de959381..d0a56a2cbf5 100644 --- a/centaur/src/main/scala/centaur/test/Test.scala +++ b/centaur/src/main/scala/centaur/test/Test.scala @@ -10,6 +10,7 @@ import centaur.test.metadata.WorkflowFlatMetadata import centaur.test.metadata.WorkflowFlatMetadata._ import centaur.test.submit.SubmitHttpResponse import centaur.test.workflow.Workflow +import com.azure.storage.blob.BlobContainerClient import com.google.api.services.genomics.v2alpha1.{Genomics, GenomicsScopes} import com.google.api.services.storage.StorageScopes import com.google.auth.Credentials @@ -23,6 +24,7 @@ import configs.syntax._ import cromwell.api.CromwellClient.UnsuccessfulRequestException import cromwell.api.model.{CallCacheDiff, Failed, HashDifference, SubmittedWorkflow, Succeeded, TerminalStatus, WaasDescription, WorkflowId, WorkflowMetadata, WorkflowStatus} import cromwell.cloudsupport.aws.AwsConfiguration +import cromwell.cloudsupport.azure.{AzureUtils} import cromwell.cloudsupport.gcp.GoogleConfiguration import cromwell.cloudsupport.gcp.auth.GoogleAuthMode import io.circe.parser._ @@ -150,6 +152,13 @@ object Operations extends StrictLogging { .build() } + lazy val azureConfig: Config = CentaurConfig.conf.getConfig("azure") + val azureSubscription = azureConfig.getString("subscription") + val blobContainer = azureConfig.getString("container") + val azureEndpoint = azureConfig.getString("endpoint") + //NB: Centaur will throw an exception if it isn't able to authenticate with Azure blob storage via the local environment. + lazy val blobContainerClient: BlobContainerClient = AzureUtils.buildContainerClientFromLocalEnvironment(blobContainer, azureEndpoint, Option(azureSubscription)).get + def submitWorkflow(workflow: Workflow): Test[SubmittedWorkflow] = { new Test[SubmittedWorkflow] { override def run: IO[SubmittedWorkflow] = for { diff --git a/centaur/src/main/scala/centaur/test/workflow/DirectoryContentCountCheck.scala b/centaur/src/main/scala/centaur/test/workflow/DirectoryContentCountCheck.scala index c5813165aea..2bf90619dab 100644 --- a/centaur/src/main/scala/centaur/test/workflow/DirectoryContentCountCheck.scala +++ b/centaur/src/main/scala/centaur/test/workflow/DirectoryContentCountCheck.scala @@ -2,7 +2,7 @@ package centaur.test.workflow import cats.data.Validated._ import cats.syntax.all._ -import centaur.test.{AWSFilesChecker, FilesChecker, LocalFilesChecker, PipelinesFilesChecker} +import centaur.test.{AWSFilesChecker, FilesChecker, LocalFilesChecker, PipelinesFilesChecker, BlobFilesChecker} import com.typesafe.config.Config import common.validation.ErrorOr.ErrorOr import configs.Result @@ -25,8 +25,9 @@ object DirectoryContentCountCheck { case Result.Success("gcs") => valid(PipelinesFilesChecker) case Result.Success("local") => valid(LocalFilesChecker) case Result.Success("aws") => valid(AWSFilesChecker) - case Result.Success(_) => invalidNel(s"Test '$name': Invalid 'fileSystemCheck' value (must be either 'local', 'gcs' or 'aws'") - case Result.Failure(_) => invalidNel(s"Test '$name': Must specify a 'fileSystemCheck' value (must be either 'local', 'gcs' or 'aws'") + case Result.Success("blob") => valid(BlobFilesChecker) + case Result.Success(_) => invalidNel(s"Test '$name': Invalid 'fileSystemCheck' value (must be either 'local', 'gcs', 'blob', or 'aws'") + case Result.Failure(_) => invalidNel(s"Test '$name': Must specify a 'fileSystemCheck' value (must be either 'local', 'gcs', 'blob', or 'aws'") } (directoryContentCountsValidation, fileSystemChecker) mapN { (d, f) => Option(DirectoryContentCountCheck(d, f)) } diff --git a/cloudSupport/src/main/scala/cromwell/cloudsupport/azure/AzureUtils.scala b/cloudSupport/src/main/scala/cromwell/cloudsupport/azure/AzureUtils.scala new file mode 100644 index 00000000000..09cf5f3869d --- /dev/null +++ b/cloudSupport/src/main/scala/cromwell/cloudsupport/azure/AzureUtils.scala @@ -0,0 +1,65 @@ +package cromwell.cloudsupport.azure + +import com.azure.core.management.AzureEnvironment +import com.azure.core.management.profile.AzureProfile +import com.azure.identity.DefaultAzureCredentialBuilder +import com.azure.resourcemanager.AzureResourceManager +import com.azure.resourcemanager.storage.models.StorageAccountKey +import com.azure.storage.blob.{BlobContainerClient, BlobContainerClientBuilder} +import com.azure.storage.common.StorageSharedKeyCredential +import com.google.common.net.UrlEscapers + +import java.net.URI +import scala.jdk.CollectionConverters.IterableHasAsScala +import scala.util.{Failure, Success, Try} + +object AzureUtils { + /** + * Generates a BlobContainerClient that can interact with the specified container. Authenticates using the local azure client running on the same machine. + * @param blobContainer Name of the blob container. Looks something like "my-blob-container". + * @param azureEndpoint Azure endpoint of the container. Looks something like https://somedomain.blob.core.windows.net. + * @param subscription Azure subscription. A globally unique identifier. If not provided, a default subscription will be used. + * @return A blob container client capable of interacting with the specified container. + */ + def buildContainerClientFromLocalEnvironment(blobContainer: String, azureEndpoint: String, subscription : Option[String]): Try[BlobContainerClient] = { + def parseURI(string: String): Try[URI] = Try(URI.create(UrlEscapers.urlFragmentEscaper().escape(string))) + def parseStorageAccount(uri: URI): Try[String] = uri.getHost.split("\\.").find(_.nonEmpty) + .map(Success(_)).getOrElse(Failure(new Exception("Could not parse storage account"))) + + val azureProfile = new AzureProfile(AzureEnvironment.AZURE) + + def azureCredentialBuilder = new DefaultAzureCredentialBuilder() + .authorityHost(azureProfile.getEnvironment.getActiveDirectoryEndpoint) + .build + + def authenticateWithSubscription(sub: String) = AzureResourceManager.authenticate(azureCredentialBuilder, azureProfile).withSubscription(sub) + + def authenticateWithDefaultSubscription = AzureResourceManager.authenticate(azureCredentialBuilder, azureProfile).withDefaultSubscription() + + def azure = subscription.map(authenticateWithSubscription(_)).getOrElse(authenticateWithDefaultSubscription) + + def findAzureStorageAccount(storageAccountName: String) = azure.storageAccounts.list.asScala.find(_.name.equals(storageAccountName)) + .map(Success(_)).getOrElse(Failure(new Exception("Azure Storage Account not found."))) + + def buildBlobContainerClient(credential: StorageSharedKeyCredential, endpointURL: String, blobContainerName: String): BlobContainerClient = { + new BlobContainerClientBuilder() + .credential(credential) + .endpoint(endpointURL) + .containerName(blobContainerName) + .buildClient() + } + + def generateBlobContainerClient: Try[BlobContainerClient] = for { + uri <- parseURI(azureEndpoint) + configuredAccount <- parseStorageAccount(uri) + azureAccount <- findAzureStorageAccount(configuredAccount) + keys = azureAccount.getKeys.asScala + key <- keys.headOption.fold[Try[StorageAccountKey]](Failure(new Exception("Storage account has no keys")))(Success(_)) + first = key.value + sskc = new StorageSharedKeyCredential(configuredAccount, first) + bcc = buildBlobContainerClient(sskc, azureEndpoint, blobContainer) + } yield bcc + + generateBlobContainerClient + } +} diff --git a/filesystems/blob/src/main/scala/cromwell/filesystems/blob/BlobFileSystemManager.scala b/filesystems/blob/src/main/scala/cromwell/filesystems/blob/BlobFileSystemManager.scala index 1672bd5bd88..e50446ea294 100644 --- a/filesystems/blob/src/main/scala/cromwell/filesystems/blob/BlobFileSystemManager.scala +++ b/filesystems/blob/src/main/scala/cromwell/filesystems/blob/BlobFileSystemManager.scala @@ -1,18 +1,12 @@ package cromwell.filesystems.blob import com.azure.core.credential.AzureSasCredential -import com.azure.core.management.AzureEnvironment -import com.azure.core.management.profile.AzureProfile -import com.azure.identity.DefaultAzureCredentialBuilder -import com.azure.resourcemanager.AzureResourceManager -import com.azure.resourcemanager.storage.models.StorageAccountKey import com.azure.storage.blob.nio.AzureFileSystem import com.azure.storage.blob.sas.{BlobContainerSasPermission, BlobServiceSasSignatureValues} -import com.azure.storage.blob.{BlobContainerClient, BlobContainerClientBuilder} -import com.azure.storage.common.StorageSharedKeyCredential import com.typesafe.config.Config import com.typesafe.scalalogging.LazyLogging import common.validation.Validation._ +import cromwell.cloudsupport.azure.AzureUtils import java.net.URI import java.nio.file.{FileSystem, FileSystemNotFoundException, FileSystems} @@ -223,22 +217,6 @@ case class WSMBlobSasTokenGenerator(container: BlobContainerName, } case class NativeBlobSasTokenGenerator(container: BlobContainerName, endpoint: EndpointURL, subscription: Option[SubscriptionId] = None) extends BlobSasTokenGenerator { - private val azureProfile = new AzureProfile(AzureEnvironment.AZURE) - private def azureCredentialBuilder = new DefaultAzureCredentialBuilder() - .authorityHost(azureProfile.getEnvironment.getActiveDirectoryEndpoint) - .build - private def authenticateWithSubscription(sub: SubscriptionId) = AzureResourceManager.authenticate(azureCredentialBuilder, azureProfile).withSubscription(sub.toString) - private def authenticateWithDefaultSubscription = AzureResourceManager.authenticate(azureCredentialBuilder, azureProfile).withDefaultSubscription() - private def azure = subscription.map(authenticateWithSubscription(_)).getOrElse(authenticateWithDefaultSubscription) - private def findAzureStorageAccount(name: StorageAccountName) = azure.storageAccounts.list.asScala.find(_.name.equals(name.value)) - .map(Success(_)).getOrElse(Failure(new Exception("Azure Storage Account not found"))) - private def buildBlobContainerClient(credential: StorageSharedKeyCredential, endpoint: EndpointURL, container: BlobContainerName): BlobContainerClient = { - new BlobContainerClientBuilder() - .credential(credential) - .endpoint(endpoint.value) - .containerName(container.value) - .buildClient() - } private val bcsp = new BlobContainerSasPermission() .setReadPermission(true) .setCreatePermission(true) @@ -252,14 +230,7 @@ case class NativeBlobSasTokenGenerator(container: BlobContainerName, endpoint: E * @return an AzureSasCredential for accessing a blob container */ def generateBlobSasToken: Try[AzureSasCredential] = for { - uri <- BlobPathBuilder.parseURI(endpoint.value) - configuredAccount <- BlobPathBuilder.parseStorageAccount(uri) - azureAccount <- findAzureStorageAccount(configuredAccount) - keys = azureAccount.getKeys.asScala - key <- keys.headOption.fold[Try[StorageAccountKey]](Failure(new Exception("Storage account has no keys")))(Success(_)) - first = key.value - sskc = new StorageSharedKeyCredential(configuredAccount.value, first) - bcc = buildBlobContainerClient(sskc, endpoint, container) + bcc <- AzureUtils.buildContainerClientFromLocalEnvironment(container.toString, endpoint.toString, subscription.map(_.toString)) bsssv = new BlobServiceSasSignatureValues(OffsetDateTime.now.plusDays(1), bcsp) asc = new AzureSasCredential(bcc.generateSas(bsssv)) } yield asc diff --git a/project/Dependencies.scala b/project/Dependencies.scala index ddc3b002f00..c98eba6d346 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -206,6 +206,7 @@ object Dependencies { exclude("jakarta.xml.bind", "jakarta.xml.bind-api") exclude("jakarta.activation", "jakarta.activation-api"), "com.azure" % "azure-core-management" % "1.7.1", + "com.fasterxml.jackson.dataformat" % "jackson-dataformat-xml" % jacksonV, "com.azure.resourcemanager" % "azure-resourcemanager" % "2.18.0" ) @@ -409,7 +410,7 @@ object Dependencies { "com.lihaoyi" %% "pprint" % pprintV, ) ++ catsDependencies ++ configDependencies ++ slf4jFacadeDependencies ++ refinedTypeDependenciesList - val cloudSupportDependencies: List[ModuleID] = googleApiClientDependencies ++ googleCloudDependencies ++ betterFilesDependencies ++ awsCloudDependencies + val cloudSupportDependencies: List[ModuleID] = googleApiClientDependencies ++ googleCloudDependencies ++ betterFilesDependencies ++ awsCloudDependencies ++ azureDependencies val databaseSqlDependencies: List[ModuleID] = List( "commons-io" % "commons-io" % commonsIoV,