Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ID-125 Add support for drshub, rename all the things #6959

Merged
merged 8 commits into from
Dec 15, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
# Martha does not return service accounts for JDR paths. Therefore they shouldn't need to be localized using the
# DRSHub does not return service accounts for JDR paths. Therefore they shouldn't need to be localized using the
# Cromwell custom DOS/DRS localizer.
#
# However, the first file1 was generated before JDR stared saving file names at the end of the gsUri.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
name: drs_usa_jdr
testFormat: WorkflowSuccess
backends: ["papi-v2-usa"]
tags: [ drs ]
skipDescribeEndpointValidation: true

files {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
name: drs_usa_jdr_preresolve
testFormat: WorkflowSuccess
backends: ["papi-v2-usa"]
tags: [ drs ]
skipDescribeEndpointValidation: true

files {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,23 +15,23 @@ import org.apache.http.HttpStatus
class DrsCloudNioFileProvider(drsPathResolver: EngineDrsPathResolver,
drsReadInterpreter: DrsReadInterpreter) extends CloudNioFileProvider {

private def checkIfPathExistsThroughMartha(drsPath: String): IO[Boolean] = {
private def checkIfPathExistsThroughDrsResolver(drsPath: String): IO[Boolean] = {
/*
* Unlike other cloud providers where directories are identified with a trailing slash at the end like `gs://bucket/dir/`,
* DRS has a concept of bundles for directories (not supported yet). Hence for method `checkDirectoryExists` which appends a trailing '/'
* to see if the current path is a directory, return false
*/
if (drsPath.endsWith("/")) IO(false)
else {
drsPathResolver.rawMarthaResponse(drsPath, NonEmptyList.one(MarthaField.GsUri)).use { marthaResponse =>
val errorMsg = s"Status line was null for martha response $marthaResponse."
toIO(Option(marthaResponse.getStatusLine), errorMsg)
drsPathResolver.rawDrsResolverResponse(drsPath, NonEmptyList.one(DrsResolverField.GsUri)).use { drsResolverResponse =>
val errorMsg = s"Status line was null for DRS Resolver response $drsResolverResponse."
toIO(Option(drsResolverResponse.getStatusLine), errorMsg)
}.map(_.getStatusCode == HttpStatus.SC_OK)
}
}

override def existsPath(drsPath: String, unused: String): Boolean =
checkIfPathExistsThroughMartha(drsPath).unsafeRunSync()
checkIfPathExistsThroughDrsResolver(drsPath).unsafeRunSync()

override def existsPaths(cloudHost: String, cloudPathPrefix: String): Boolean =
existsPath(cloudHost, cloudPathPrefix)
Expand All @@ -47,11 +47,11 @@ class DrsCloudNioFileProvider(drsPathResolver: EngineDrsPathResolver,
throw new UnsupportedOperationException("DRS currently doesn't support delete.")

override def read(drsPath: String, unused: String, offset: Long): ReadableByteChannel = {
val fields = NonEmptyList.of(MarthaField.GsUri, MarthaField.GoogleServiceAccount, MarthaField.AccessUrl)
val fields = NonEmptyList.of(DrsResolverField.GsUri, DrsResolverField.GoogleServiceAccount, DrsResolverField.AccessUrl)

val byteChannelIO = for {
marthaResponse <- drsPathResolver.resolveDrsThroughMartha(drsPath, fields)
byteChannel <- drsReadInterpreter(drsPathResolver, marthaResponse)
drsResolverResponse <- drsPathResolver.resolveDrs(drsPath, fields)
byteChannel <- drsReadInterpreter(drsPathResolver, drsResolverResponse)
} yield byteChannel

byteChannelIO.handleErrorWith {
Expand All @@ -63,20 +63,20 @@ class DrsCloudNioFileProvider(drsPathResolver: EngineDrsPathResolver,
throw new UnsupportedOperationException("DRS currently doesn't support write.")

override def fileAttributes(drsPath: String, unused: String): Option[CloudNioRegularFileAttributes] = {
val fields = NonEmptyList.of(MarthaField.Size, MarthaField.TimeCreated, MarthaField.TimeUpdated, MarthaField.Hashes)
val fields = NonEmptyList.of(DrsResolverField.Size, DrsResolverField.TimeCreated, DrsResolverField.TimeUpdated, DrsResolverField.Hashes)

val fileAttributesIO = for {
marthaResponse <- drsPathResolver.resolveDrsThroughMartha(drsPath, fields)
sizeOption = marthaResponse.size
hashOption = getPreferredHash(marthaResponse.hashes)
timeCreatedOption <- convertToFileTime(drsPath, MarthaField.TimeCreated, marthaResponse.timeCreated)
timeUpdatedOption <- convertToFileTime(drsPath, MarthaField.TimeUpdated, marthaResponse.timeUpdated)
drsResolverResponse <- drsPathResolver.resolveDrs(drsPath, fields)
sizeOption = drsResolverResponse.size
hashOption = getPreferredHash(drsResolverResponse.hashes)
timeCreatedOption <- convertToFileTime(drsPath, DrsResolverField.TimeCreated, drsResolverResponse.timeCreated)
timeUpdatedOption <- convertToFileTime(drsPath, DrsResolverField.TimeUpdated, drsResolverResponse.timeUpdated)
} yield new DrsCloudNioRegularFileAttributes(drsPath, sizeOption, hashOption, timeCreatedOption, timeUpdatedOption)

Option(fileAttributesIO.unsafeRunSync())
}
}

object DrsCloudNioFileProvider {
type DrsReadInterpreter = (DrsPathResolver, MarthaResponse) => IO[ReadableByteChannel]
type DrsReadInterpreter = (DrsPathResolver, DrsResolverResponse) => IO[ReadableByteChannel]
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ class DrsCloudNioFileSystemProvider(rootConfig: Config,
drsReadInterpreter: DrsReadInterpreter,
) extends CloudNioFileSystemProvider {

lazy val drsConfig: DrsConfig = DrsConfig.fromConfig(rootConfig.getConfig("martha"))
lazy val drsResolverConfig = if (rootConfig.hasPath("resolver")) rootConfig.getConfig("resolver") else rootConfig.getConfig("martha")
lazy val drsConfig: DrsConfig = DrsConfig.fromConfig(drsResolverConfig)

lazy val drsPathResolver: EngineDrsPathResolver =
EngineDrsPathResolver(drsConfig, drsCredentials)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ object DrsCloudNioRegularFileAttributes {
.map(FileTime.from)
}

def convertToFileTime(drsPath: String, key: MarthaField.Value, timeInStringOption: Option[String]): IO[Option[FileTime]] = {
def convertToFileTime(drsPath: String, key: DrsResolverField.Value, timeInStringOption: Option[String]): IO[Option[FileTime]] = {
timeInStringOption match {
case None => IO.pure(None)
case Some(timeInString) =>
Expand All @@ -71,7 +71,7 @@ object DrsCloudNioRegularFileAttributes {
throwable =>
IO.raiseError(
new RuntimeException(
s"Error while parsing '$key' value from Martha to FileTime for DRS path $drsPath. " +
s"Error while parsing '$key' value from DRS Resolver to FileTime for DRS path $drsPath. " +
s"Reason: ${ExceptionUtils.getMessage(throwable)}.",
throwable,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import net.ceedubs.ficus.Ficus._
import scala.concurrent.duration._
import scala.language.postfixOps

final case class DrsConfig(marthaUrl: String,
final case class DrsConfig(drsResolverUrl: String,
numRetries: Int,
waitInitial: FiniteDuration,
waitMaximum: FiniteDuration,
Expand All @@ -22,45 +22,46 @@ object DrsConfig {
private val DefaultWaitMultiplier = 1.5d
private val DefaultWaitRandomizationFactor = 0.1

private val EnvMarthaUrl = "MARTHA_URL"
private val EnvMarthaNumRetries = "MARTHA_NUM_RETRIES"
private val EnvMarthaWaitInitialSeconds = "MARTHA_WAIT_INITIAL_SECONDS"
private val EnvMarthaWaitMaximumSeconds = "MARTHA_WAIT_MAXIMUM_SECONDS"
private val EnvMarthaWaitMultiplier = "MARTHA_WAIT_MULTIPLIER"
private val EnvMarthaWaitRandomizationFactor = "MARTHA_WAIT_RANDOMIZATION_FACTOR"
private val EnvDrsResolverUrl = "DRS_RESOLVER_URL"
private val EnvDrsResolverNumRetries = "DRS_RESOLVER_NUM_RETRIES"
private val EnvDrsResolverWaitInitialSeconds = "DRS_RESOLVER_WAIT_INITIAL_SECONDS"
private val EnvDrsResolverWaitMaximumSeconds = "DRS_RESOLVER_WAIT_MAXIMUM_SECONDS"
private val EnvDrsResolverWaitMultiplier = "DRS_RESOLVER_WAIT_MULTIPLIER"
private val EnvDrsResolverWaitRandomizationFactor = "DRS_RESOLVER_WAIT_RANDOMIZATION_FACTOR"

def fromConfig(marthaConfig: Config): DrsConfig = {

def fromConfig(drsResolverConfig: Config): DrsConfig = {
DrsConfig(
marthaUrl = marthaConfig.getString("url"),
numRetries = marthaConfig.getOrElse("num-retries", DefaultNumRetries),
waitInitial = marthaConfig.getOrElse("wait-initial", DefaultWaitInitial),
waitMaximum = marthaConfig.getOrElse("wait-maximum", DefaultWaitMaximum),
waitMultiplier = marthaConfig.getOrElse("wait-multiplier", DefaultWaitMultiplier),
drsResolverUrl = drsResolverConfig.getString("url"),
numRetries = drsResolverConfig.getOrElse("num-retries", DefaultNumRetries),
waitInitial = drsResolverConfig.getOrElse("wait-initial", DefaultWaitInitial),
waitMaximum = drsResolverConfig.getOrElse("wait-maximum", DefaultWaitMaximum),
waitMultiplier = drsResolverConfig.getOrElse("wait-multiplier", DefaultWaitMultiplier),
waitRandomizationFactor =
marthaConfig.getOrElse("wait-randomization-factor", DefaultWaitRandomizationFactor),
drsResolverConfig.getOrElse("wait-randomization-factor", DefaultWaitRandomizationFactor),
)
}

def fromEnv(env: Map[String, String]): DrsConfig = {
DrsConfig(
marthaUrl = env(EnvMarthaUrl),
numRetries = env.get(EnvMarthaNumRetries).map(_.toInt).getOrElse(DefaultNumRetries),
waitInitial = env.get(EnvMarthaWaitInitialSeconds).map(_.toLong.seconds).getOrElse(DefaultWaitInitial),
waitMaximum = env.get(EnvMarthaWaitMaximumSeconds).map(_.toLong.seconds).getOrElse(DefaultWaitMaximum),
waitMultiplier = env.get(EnvMarthaWaitMultiplier).map(_.toDouble).getOrElse(DefaultWaitMultiplier),
drsResolverUrl = env(EnvDrsResolverUrl),
numRetries = env.get(EnvDrsResolverNumRetries).map(_.toInt).getOrElse(DefaultNumRetries),
waitInitial = env.get(EnvDrsResolverWaitInitialSeconds).map(_.toLong.seconds).getOrElse(DefaultWaitInitial),
waitMaximum = env.get(EnvDrsResolverWaitMaximumSeconds).map(_.toLong.seconds).getOrElse(DefaultWaitMaximum),
waitMultiplier = env.get(EnvDrsResolverWaitMultiplier).map(_.toDouble).getOrElse(DefaultWaitMultiplier),
waitRandomizationFactor =
env.get(EnvMarthaWaitRandomizationFactor).map(_.toDouble).getOrElse(DefaultWaitRandomizationFactor),
env.get(EnvDrsResolverWaitRandomizationFactor).map(_.toDouble).getOrElse(DefaultWaitRandomizationFactor),
)
}

def toEnv(drsConfig: DrsConfig): Map[String, String] = {
Map(
EnvMarthaUrl -> drsConfig.marthaUrl,
EnvMarthaNumRetries -> s"${drsConfig.numRetries}",
EnvMarthaWaitInitialSeconds -> s"${drsConfig.waitInitial.toSeconds}",
EnvMarthaWaitMaximumSeconds -> s"${drsConfig.waitMaximum.toSeconds}",
EnvMarthaWaitMultiplier -> s"${drsConfig.waitMultiplier}",
EnvMarthaWaitRandomizationFactor -> s"${drsConfig.waitRandomizationFactor}",
EnvDrsResolverUrl -> drsConfig.drsResolverUrl,
EnvDrsResolverNumRetries -> s"${drsConfig.numRetries}",
EnvDrsResolverWaitInitialSeconds -> s"${drsConfig.waitInitial.toSeconds}",
EnvDrsResolverWaitMaximumSeconds -> s"${drsConfig.waitMaximum.toSeconds}",
EnvDrsResolverWaitMultiplier -> s"${drsConfig.waitMultiplier}",
EnvDrsResolverWaitRandomizationFactor -> s"${drsConfig.waitRandomizationFactor}",
)
}
}
Loading