Skip to content

Commit

Permalink
Progress on DRS
Browse files Browse the repository at this point in the history
  • Loading branch information
jgainerdewar committed Jan 28, 2025
1 parent be3a8a3 commit 0c32295
Show file tree
Hide file tree
Showing 9 changed files with 42 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,10 @@ class DrsCloudNioFileProvider(drsPathResolver: DrsPathResolver, drsReadInterpret
val fileAttributesIO = for {
drsResolverResponse <- drsPathResolver.resolveDrs(drsPath, fields)
sizeOption = drsResolverResponse.size
hashOption = getPreferredHash(drsResolverResponse.hashes)
hashOptions = getHashOptions(drsResolverResponse.hashes)
timeCreatedOption <- convertToFileTime(drsPath, DrsResolverField.TimeCreated, drsResolverResponse.timeCreated)
timeUpdatedOption <- convertToFileTime(drsPath, DrsResolverField.TimeUpdated, drsResolverResponse.timeUpdated)
} yield new DrsCloudNioRegularFileAttributes(drsPath, sizeOption, hashOption, timeCreatedOption, timeUpdatedOption)
} yield new DrsCloudNioRegularFileAttributes(drsPath, sizeOption, hashOptions, timeCreatedOption, timeUpdatedOption)

Option(fileAttributesIO.unsafeRunSync())
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,14 @@ package cloud.nio.impl.drs
import java.nio.file.attribute.FileTime
import java.time.{LocalDateTime, OffsetDateTime, ZoneOffset}
import cats.effect.IO
import cloud.nio.spi.HashType.HashType
import cloud.nio.spi.HashType._
import cloud.nio.spi.{CloudNioRegularFileAttributes, FileHash, HashType}
import io.netty.util.HashingStrategy
import org.apache.commons.lang3.exception.ExceptionUtils

class DrsCloudNioRegularFileAttributes(drsPath: String,
sizeOption: Option[Long],
hashOption: Option[FileHash],
var fileHashes: List[FileHash],
timeCreatedOption: Option[FileTime],
timeUpdatedOption: Option[FileTime]
) extends CloudNioRegularFileAttributes {
Expand All @@ -18,29 +19,28 @@ class DrsCloudNioRegularFileAttributes(drsPath: String,

override def size(): Long = sizeOption.getOrElse(0)

override def fileHash: Option[FileHash] = hashOption

override def creationTime(): FileTime = timeCreatedOption.getOrElse(lastModifiedTime())

override def lastModifiedTime(): FileTime = timeUpdatedOption.getOrElse(FileTime.fromMillis(0))
}

object DrsCloudNioRegularFileAttributes {
private val priorityHashList: Seq[(String, HashType)] = Seq(
private val priorityHashList: List[(String, HashType)] = List(
("crc32c", HashType.Crc32c),
("md5", HashType.Md5),
("sha256", HashType.Sha256),
("etag", HashType.S3Etag)
)

def getPreferredHash(hashesOption: Option[Map[String, String]]): Option[FileHash] =
def getHashOptions(hashesOption: Option[Map[String, String]]): Seq[FileHash] =
hashesOption match {
case Some(hashes: Map[String, String]) if hashes.nonEmpty =>
priorityHashList collectFirst {
priorityHashList collect {
case (key, hashType) if hashes.contains(key) => FileHash(hashType, hashes(key))
}
// if no preferred hash was found, go ahead and return none because we don't support anything that the DRS object is offering
case _ => None
// if no preferred hash was found, go ahead and return an empty seq because we don't support anything that the
// DRS object is offering
case _ => List.empty
}

private def convertToOffsetDateTime(timeInString: String): IO[OffsetDateTime] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ class DrsCloudNioFileProviderSpec extends AnyFlatSpecLike with CromwellTimeoutSp
drsFileAttributes.creationTime().toMillis should be(123L)
drsFileAttributes.lastModifiedTime().toMillis should be(456L)
drsFileAttributes.size() should be(789L)
drsFileAttributes.fileHash should be(Option(FileHash(HashType.Md5, "gg0217869")))
drsFileAttributes.fileHashes should be(Option(FileHash(HashType.Md5, "gg0217869")))
}

it should "throw exceptions for unsupported methods" in {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
package cloud.nio.impl.ftp

import java.nio.file.attribute.FileTime

import cloud.nio.spi.CloudNioRegularFileAttributes
import cloud.nio.spi.{CloudNioRegularFileAttributes, FileHash}
import org.apache.commons.net.ftp.FTPFile

class FtpCloudNioRegularFileAttributes(file: FTPFile, key: String) extends CloudNioRegularFileAttributes {
override def fileHash = None
override def fileHashes: List[FileHash] = List.empty
override def lastModifiedTime() = FileTime.from(file.getTimestamp.toInstant)
override def size() = file.getSize
override def fileKey() = key
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ class FtpCloudNioFileProviderSpec extends AnyFlatSpec with CromwellTimeoutSpec w
attributes.size() shouldBe 6L
attributes.isDirectory shouldBe false
attributes.isRegularFile shouldBe true
attributes.fileHash shouldBe None
attributes.fileHashes shouldBe None
attributes.fileKey shouldBe "localhost/root/fileAttributes/file"
attributes.lastModifiedTime() should not be null
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import java.net.URI
import java.nio.channels.ReadableByteChannel
import java.nio.file.FileAlreadyExistsException
import cloud.nio.impl.ftp.FtpUtil.FtpIoException
import cloud.nio.spi.{CloudNioRegularFileAttributes, CloudNioRetry}
import cloud.nio.spi.{CloudNioRegularFileAttributes, CloudNioRetry, FileHash}
import com.typesafe.config.ConfigFactory
import common.assertion.CromwellTimeoutSpec
import common.mock.MockSugar
Expand Down Expand Up @@ -58,7 +58,7 @@ class FtpCloudNioFileSystemProviderSpec
override def fileAttributes(cloudHost: String, cloudPath: String): Option[CloudNioRegularFileAttributes] =
Option(
new CloudNioRegularFileAttributes {
override def fileHash = throw new UnsupportedOperationException()
override def fileHashes(): List[FileHash] = throw new UnsupportedOperationException()
override def lastModifiedTime() = throw new UnsupportedOperationException()
override def size(): Long = mockSizeFunction()
override def fileKey() = throw new UnsupportedOperationException()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import cloud.nio.spi.CloudNioFileAttributes._
import cloud.nio.spi.HashType.HashType

trait CloudNioFileAttributes extends BasicFileAttributes {
def fileHash: Option[FileHash]
def fileHashes: List[FileHash]
}

object CloudNioFileAttributes {
Expand Down Expand Up @@ -53,5 +53,5 @@ final case class CloudNioDirectoryAttributes(path: CloudNioPath) extends CloudNi

override val fileKey: AnyRef = path

override val fileHash: Option[FileHash] = None
override val fileHashes: List[FileHash] = List.empty
}
25 changes: 17 additions & 8 deletions engine/src/main/scala/cromwell/engine/io/nio/NioHashing.scala
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,8 @@ object NioHashing {
file match {
case gcsPath: GcsPath => getFileHashForGcsPath(gcsPath, hashStrategy).map(Option(_))
case blobPath: BlobPath => getFileHashForBlobPath(blobPath, hashStrategy)
case drsPath: DrsPath =>
IO {
// We assume all DRS files have a stored hash; this will throw
// if the file does not.
drsPath.getFileHash
}.map(Option(_))
case s3Path: S3Path => getFileHashForS3Path(s3Path(s3Path, hashStrategy))
case drsPath: DrsPath => getFileHashForDrsPath(drsPath, hashStrategy)
case s3Path: S3Path => getFileHashForS3Path(s3Path, hashStrategy)
case _ => IO.pure(None)
}

Expand Down Expand Up @@ -79,7 +74,7 @@ object NioHashing {
val cloudFile = gcsPath.objectBlobId.map(id => gcsPath.cloudStorage.get(id))
cloudFile.map(f =>
hashStrategy match {
case FileHashStrategy.Crc32c => Option(f.getCrc32c).map(FileHash(HashType.GcsCrc32c, _))
case FileHashStrategy.Crc32c => Option(f.getCrc32c).map(FileHash(HashType.Crc32c, _))
case FileHashStrategy.Md5 => Option(f.getMd5).map(FileHash(HashType.Md5, _))
// TODO check whether this blob id toString is the same as GcsBatch id, I don't think it is
case FileHashStrategy.Md5ThenIdentity =>
Expand All @@ -97,6 +92,20 @@ object NioHashing {
}
}

private def getFileHashForDrsPath(drsPath: DrsPath, hashStrategy: FileHashStrategy): IO[Option[FileHash]] =
IO {
// TODO this is also used for checksumming local engine downloads!
// TODO need to use multiple hash strategies for DRS (etag for s3, md5 for google, etc)
val drsHashes = drsPath.getFileHashes
hashStrategy match {
case FileHashStrategy.Crc32c => drsHashes.find(_.hashType == HashType.Crc32c)
case FileHashStrategy.Md5 => drsHashes.find(_.hashType == HashType.Md5)
case FileHashStrategy.Sha256 => drsHashes.find(_.hashType == HashType.Sha256)
case FileHashStrategy.ETag => drsHashes.find(_.hashType == HashType.S3Etag)
case _ => None
}
}

private def getFileHashForS3Path(s3Path: S3Path, hashStrategy: FileHashStrategy): IO[Option[FileHash]] =
IO {
hashStrategy match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package cromwell.filesystems.drs

import cloud.nio.impl.drs.DrsCloudNioFileSystemProvider
import cloud.nio.spi.{CloudNioPath, FileHash}
import cromwell.core.callcaching.FileHashStrategy
import cromwell.core.path.{NioPath, Path}

import java.io.IOException
Expand All @@ -19,20 +20,17 @@ case class DrsPath(drsPath: CloudNioPath, requesterPaysProjectIdOption: Option[S

override def pathWithoutScheme: String = pathAsString.stripPrefix(drsPath.getFileSystem.provider.getScheme + "://")

def getFileHash: FileHash = {
def getFileHashes: List[FileHash] = {
val drsFileSystemProvider = drsPath.getFileSystem.provider.asInstanceOf[DrsCloudNioFileSystemProvider]

val fileAttributesOption = drsFileSystemProvider.fileProvider.fileAttributes(drsPath.cloudHost, drsPath.cloudPath)

fileAttributesOption match {
case Some(fileAttributes) if fileAttributes.fileHashes.nonEmpty => fileAttributes.fileHashes
case Some(fileAttributes) =>
fileAttributes.fileHash match {
case Some(fileHash) => fileHash
case None =>
throw new IOException(
s"Error while resolving DRS path $this. The response from DRS Resolver doesn't contain the 'md5' hash for the file."
)
}
throw new IOException(
s"Error while resolving DRS path $this. The response from DRS Resolver doesn't contain any known hashes for the file."
)
case None =>
throw new IOException(
s"Error getting file hash of DRS path $this. Reason: File attributes class DrsCloudNioRegularFileAttributes wasn't defined in DrsCloudNioFileProvider."
Expand Down

0 comments on commit 0c32295

Please sign in to comment.