Skip to content

Commit

Permalink
ID-125 Add support for drshub, rename all the things (#6959)
Browse files Browse the repository at this point in the history
* Add support for drshub, rename all the things

* fallback to martha if resolver is not in config
  • Loading branch information
tlangs authored Dec 15, 2022
1 parent 308714a commit aa9e876
Show file tree
Hide file tree
Showing 40 changed files with 298 additions and 294 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
# Martha does not return service accounts for JDR paths. Therefore they shouldn't need to be localized using the
# DRSHub does not return service accounts for JDR paths. Therefore they shouldn't need to be localized using the
# Cromwell custom DOS/DRS localizer.
#
# However, the first file1 was generated before JDR stared saving file names at the end of the gsUri.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
name: drs_usa_jdr
testFormat: WorkflowSuccess
backends: ["papi-v2-usa"]
tags: [ drs ]
skipDescribeEndpointValidation: true

files {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
name: drs_usa_jdr_preresolve
testFormat: WorkflowSuccess
backends: ["papi-v2-usa"]
tags: [ drs ]
skipDescribeEndpointValidation: true

files {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,23 +15,23 @@ import org.apache.http.HttpStatus
class DrsCloudNioFileProvider(drsPathResolver: EngineDrsPathResolver,
drsReadInterpreter: DrsReadInterpreter) extends CloudNioFileProvider {

private def checkIfPathExistsThroughMartha(drsPath: String): IO[Boolean] = {
private def checkIfPathExistsThroughDrsResolver(drsPath: String): IO[Boolean] = {
/*
* Unlike other cloud providers where directories are identified with a trailing slash at the end like `gs://bucket/dir/`,
* DRS has a concept of bundles for directories (not supported yet). Hence for method `checkDirectoryExists` which appends a trailing '/'
* to see if the current path is a directory, return false
*/
if (drsPath.endsWith("/")) IO(false)
else {
drsPathResolver.rawMarthaResponse(drsPath, NonEmptyList.one(MarthaField.GsUri)).use { marthaResponse =>
val errorMsg = s"Status line was null for martha response $marthaResponse."
toIO(Option(marthaResponse.getStatusLine), errorMsg)
drsPathResolver.rawDrsResolverResponse(drsPath, NonEmptyList.one(DrsResolverField.GsUri)).use { drsResolverResponse =>
val errorMsg = s"Status line was null for DRS Resolver response $drsResolverResponse."
toIO(Option(drsResolverResponse.getStatusLine), errorMsg)
}.map(_.getStatusCode == HttpStatus.SC_OK)
}
}

override def existsPath(drsPath: String, unused: String): Boolean =
checkIfPathExistsThroughMartha(drsPath).unsafeRunSync()
checkIfPathExistsThroughDrsResolver(drsPath).unsafeRunSync()

override def existsPaths(cloudHost: String, cloudPathPrefix: String): Boolean =
existsPath(cloudHost, cloudPathPrefix)
Expand All @@ -47,11 +47,11 @@ class DrsCloudNioFileProvider(drsPathResolver: EngineDrsPathResolver,
throw new UnsupportedOperationException("DRS currently doesn't support delete.")

override def read(drsPath: String, unused: String, offset: Long): ReadableByteChannel = {
val fields = NonEmptyList.of(MarthaField.GsUri, MarthaField.GoogleServiceAccount, MarthaField.AccessUrl)
val fields = NonEmptyList.of(DrsResolverField.GsUri, DrsResolverField.GoogleServiceAccount, DrsResolverField.AccessUrl)

val byteChannelIO = for {
marthaResponse <- drsPathResolver.resolveDrsThroughMartha(drsPath, fields)
byteChannel <- drsReadInterpreter(drsPathResolver, marthaResponse)
drsResolverResponse <- drsPathResolver.resolveDrs(drsPath, fields)
byteChannel <- drsReadInterpreter(drsPathResolver, drsResolverResponse)
} yield byteChannel

byteChannelIO.handleErrorWith {
Expand All @@ -63,20 +63,20 @@ class DrsCloudNioFileProvider(drsPathResolver: EngineDrsPathResolver,
throw new UnsupportedOperationException("DRS currently doesn't support write.")

override def fileAttributes(drsPath: String, unused: String): Option[CloudNioRegularFileAttributes] = {
val fields = NonEmptyList.of(MarthaField.Size, MarthaField.TimeCreated, MarthaField.TimeUpdated, MarthaField.Hashes)
val fields = NonEmptyList.of(DrsResolverField.Size, DrsResolverField.TimeCreated, DrsResolverField.TimeUpdated, DrsResolverField.Hashes)

val fileAttributesIO = for {
marthaResponse <- drsPathResolver.resolveDrsThroughMartha(drsPath, fields)
sizeOption = marthaResponse.size
hashOption = getPreferredHash(marthaResponse.hashes)
timeCreatedOption <- convertToFileTime(drsPath, MarthaField.TimeCreated, marthaResponse.timeCreated)
timeUpdatedOption <- convertToFileTime(drsPath, MarthaField.TimeUpdated, marthaResponse.timeUpdated)
drsResolverResponse <- drsPathResolver.resolveDrs(drsPath, fields)
sizeOption = drsResolverResponse.size
hashOption = getPreferredHash(drsResolverResponse.hashes)
timeCreatedOption <- convertToFileTime(drsPath, DrsResolverField.TimeCreated, drsResolverResponse.timeCreated)
timeUpdatedOption <- convertToFileTime(drsPath, DrsResolverField.TimeUpdated, drsResolverResponse.timeUpdated)
} yield new DrsCloudNioRegularFileAttributes(drsPath, sizeOption, hashOption, timeCreatedOption, timeUpdatedOption)

Option(fileAttributesIO.unsafeRunSync())
}
}

object DrsCloudNioFileProvider {
type DrsReadInterpreter = (DrsPathResolver, MarthaResponse) => IO[ReadableByteChannel]
type DrsReadInterpreter = (DrsPathResolver, DrsResolverResponse) => IO[ReadableByteChannel]
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ class DrsCloudNioFileSystemProvider(rootConfig: Config,
drsReadInterpreter: DrsReadInterpreter,
) extends CloudNioFileSystemProvider {

lazy val drsConfig: DrsConfig = DrsConfig.fromConfig(rootConfig.getConfig("martha"))
lazy val drsResolverConfig = if (rootConfig.hasPath("resolver")) rootConfig.getConfig("resolver") else rootConfig.getConfig("martha")
lazy val drsConfig: DrsConfig = DrsConfig.fromConfig(drsResolverConfig)

lazy val drsPathResolver: EngineDrsPathResolver =
EngineDrsPathResolver(drsConfig, drsCredentials)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ object DrsCloudNioRegularFileAttributes {
.map(FileTime.from)
}

def convertToFileTime(drsPath: String, key: MarthaField.Value, timeInStringOption: Option[String]): IO[Option[FileTime]] = {
def convertToFileTime(drsPath: String, key: DrsResolverField.Value, timeInStringOption: Option[String]): IO[Option[FileTime]] = {
timeInStringOption match {
case None => IO.pure(None)
case Some(timeInString) =>
Expand All @@ -71,7 +71,7 @@ object DrsCloudNioRegularFileAttributes {
throwable =>
IO.raiseError(
new RuntimeException(
s"Error while parsing '$key' value from Martha to FileTime for DRS path $drsPath. " +
s"Error while parsing '$key' value from DRS Resolver to FileTime for DRS path $drsPath. " +
s"Reason: ${ExceptionUtils.getMessage(throwable)}.",
throwable,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import net.ceedubs.ficus.Ficus._
import scala.concurrent.duration._
import scala.language.postfixOps

final case class DrsConfig(marthaUrl: String,
final case class DrsConfig(drsResolverUrl: String,
numRetries: Int,
waitInitial: FiniteDuration,
waitMaximum: FiniteDuration,
Expand All @@ -22,45 +22,46 @@ object DrsConfig {
private val DefaultWaitMultiplier = 1.5d
private val DefaultWaitRandomizationFactor = 0.1

private val EnvMarthaUrl = "MARTHA_URL"
private val EnvMarthaNumRetries = "MARTHA_NUM_RETRIES"
private val EnvMarthaWaitInitialSeconds = "MARTHA_WAIT_INITIAL_SECONDS"
private val EnvMarthaWaitMaximumSeconds = "MARTHA_WAIT_MAXIMUM_SECONDS"
private val EnvMarthaWaitMultiplier = "MARTHA_WAIT_MULTIPLIER"
private val EnvMarthaWaitRandomizationFactor = "MARTHA_WAIT_RANDOMIZATION_FACTOR"
private val EnvDrsResolverUrl = "DRS_RESOLVER_URL"
private val EnvDrsResolverNumRetries = "DRS_RESOLVER_NUM_RETRIES"
private val EnvDrsResolverWaitInitialSeconds = "DRS_RESOLVER_WAIT_INITIAL_SECONDS"
private val EnvDrsResolverWaitMaximumSeconds = "DRS_RESOLVER_WAIT_MAXIMUM_SECONDS"
private val EnvDrsResolverWaitMultiplier = "DRS_RESOLVER_WAIT_MULTIPLIER"
private val EnvDrsResolverWaitRandomizationFactor = "DRS_RESOLVER_WAIT_RANDOMIZATION_FACTOR"

def fromConfig(marthaConfig: Config): DrsConfig = {

def fromConfig(drsResolverConfig: Config): DrsConfig = {
DrsConfig(
marthaUrl = marthaConfig.getString("url"),
numRetries = marthaConfig.getOrElse("num-retries", DefaultNumRetries),
waitInitial = marthaConfig.getOrElse("wait-initial", DefaultWaitInitial),
waitMaximum = marthaConfig.getOrElse("wait-maximum", DefaultWaitMaximum),
waitMultiplier = marthaConfig.getOrElse("wait-multiplier", DefaultWaitMultiplier),
drsResolverUrl = drsResolverConfig.getString("url"),
numRetries = drsResolverConfig.getOrElse("num-retries", DefaultNumRetries),
waitInitial = drsResolverConfig.getOrElse("wait-initial", DefaultWaitInitial),
waitMaximum = drsResolverConfig.getOrElse("wait-maximum", DefaultWaitMaximum),
waitMultiplier = drsResolverConfig.getOrElse("wait-multiplier", DefaultWaitMultiplier),
waitRandomizationFactor =
marthaConfig.getOrElse("wait-randomization-factor", DefaultWaitRandomizationFactor),
drsResolverConfig.getOrElse("wait-randomization-factor", DefaultWaitRandomizationFactor),
)
}

def fromEnv(env: Map[String, String]): DrsConfig = {
DrsConfig(
marthaUrl = env(EnvMarthaUrl),
numRetries = env.get(EnvMarthaNumRetries).map(_.toInt).getOrElse(DefaultNumRetries),
waitInitial = env.get(EnvMarthaWaitInitialSeconds).map(_.toLong.seconds).getOrElse(DefaultWaitInitial),
waitMaximum = env.get(EnvMarthaWaitMaximumSeconds).map(_.toLong.seconds).getOrElse(DefaultWaitMaximum),
waitMultiplier = env.get(EnvMarthaWaitMultiplier).map(_.toDouble).getOrElse(DefaultWaitMultiplier),
drsResolverUrl = env(EnvDrsResolverUrl),
numRetries = env.get(EnvDrsResolverNumRetries).map(_.toInt).getOrElse(DefaultNumRetries),
waitInitial = env.get(EnvDrsResolverWaitInitialSeconds).map(_.toLong.seconds).getOrElse(DefaultWaitInitial),
waitMaximum = env.get(EnvDrsResolverWaitMaximumSeconds).map(_.toLong.seconds).getOrElse(DefaultWaitMaximum),
waitMultiplier = env.get(EnvDrsResolverWaitMultiplier).map(_.toDouble).getOrElse(DefaultWaitMultiplier),
waitRandomizationFactor =
env.get(EnvMarthaWaitRandomizationFactor).map(_.toDouble).getOrElse(DefaultWaitRandomizationFactor),
env.get(EnvDrsResolverWaitRandomizationFactor).map(_.toDouble).getOrElse(DefaultWaitRandomizationFactor),
)
}

def toEnv(drsConfig: DrsConfig): Map[String, String] = {
Map(
EnvMarthaUrl -> drsConfig.marthaUrl,
EnvMarthaNumRetries -> s"${drsConfig.numRetries}",
EnvMarthaWaitInitialSeconds -> s"${drsConfig.waitInitial.toSeconds}",
EnvMarthaWaitMaximumSeconds -> s"${drsConfig.waitMaximum.toSeconds}",
EnvMarthaWaitMultiplier -> s"${drsConfig.waitMultiplier}",
EnvMarthaWaitRandomizationFactor -> s"${drsConfig.waitRandomizationFactor}",
EnvDrsResolverUrl -> drsConfig.drsResolverUrl,
EnvDrsResolverNumRetries -> s"${drsConfig.numRetries}",
EnvDrsResolverWaitInitialSeconds -> s"${drsConfig.waitInitial.toSeconds}",
EnvDrsResolverWaitMaximumSeconds -> s"${drsConfig.waitMaximum.toSeconds}",
EnvDrsResolverWaitMultiplier -> s"${drsConfig.waitMultiplier}",
EnvDrsResolverWaitRandomizationFactor -> s"${drsConfig.waitRandomizationFactor}",
)
}
}
Loading

0 comments on commit aa9e876

Please sign in to comment.