Skip to content

Commit

Permalink
Merge branch 'develop' into AN-333-drs-retry
Browse files Browse the repository at this point in the history
  • Loading branch information
lucymcnatt authored Jan 16, 2025
2 parents f93fa88 + 7818492 commit 24e2d6d
Show file tree
Hide file tree
Showing 9 changed files with 66 additions and 44 deletions.
1 change: 1 addition & 0 deletions .github/workflows/cromwell_unit_tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ jobs:

steps:
- uses: actions/checkout@v3 # checkout the cromwell repo
- uses: sbt/setup-sbt@v1
- uses: ./.github/set_up_cromwell_action #Exectute this reusable github action. It will set up java/sbt/git-secrets/cromwell.
with:
cromwell_repo_token: ${{ secrets.BROADBOT_GITHUB_TOKEN }}
Expand Down
1 change: 1 addition & 0 deletions .github/workflows/docker_build_test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ jobs:
repository: broadinstitute/cromwell
token: ${{ secrets.BROADBOT_GITHUB_TOKEN }}
path: cromwell
- uses: sbt/setup-sbt@v1
- uses: actions/setup-java@v4
with:
distribution: 'temurin'
Expand Down
7 changes: 4 additions & 3 deletions .github/workflows/integration_tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -93,9 +93,9 @@ jobs:
friendly_name: Centaur Engine Upgrade Local with MySQL 5.7
- build_type: referenceDiskManifestBuilderApp
friendly_name: Reference Disk Manifest Builder App
- build_type: centaurSlurm
build_mysql: 5.7
friendly_name: "Centaur Slurm with MySQL 5.7"
# - build_type: centaurSlurm
# build_mysql: 5.7
# friendly_name: "Centaur Slurm with MySQL 5.7"
- build_type: centaurBlob
build_mysql: 5.7
friendly_name: Centaur Blob
Expand All @@ -114,6 +114,7 @@ jobs:
runs-on: ubuntu-latest
timeout-minutes: 120
steps:
- uses: sbt/setup-sbt@v1
- uses: actions/checkout@v3 # checkout the cromwell repo
with:
ref: ${{ inputs.target-branch }}
Expand Down
1 change: 1 addition & 0 deletions .github/workflows/scalafmt-check.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ jobs:
runs-on: ubuntu-latest
timeout-minutes: 20
steps:
- uses: sbt/setup-sbt@v1
- uses: actions/checkout@v3
with:
ref: ${{ inputs.target-branch }}
Expand Down
1 change: 1 addition & 0 deletions .github/workflows/trivy.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ jobs:

steps:
- uses: actions/checkout@v2
- uses: sbt/setup-sbt@v1

# fetch SBT package
- uses: actions/setup-java@v4
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,8 @@ class MetadataBuilderActorSpec
val subWorkflow1aId = WorkflowId(UUID.fromString("1a1a1a1a-f76d-4af3-b371-5ba580916729"))
val subWorkflow1bId = WorkflowId(UUID.fromString("1b1b1b1b-f76d-4af3-b371-5ba580916729"))

val workflowState = WorkflowSucceeded
val workflowSucceededState = WorkflowSucceeded
val workflowRunningState = WorkflowRunning

val mainEvents = List(
MetadataEvent(MetadataKey(mainWorkflowId, Option(MetadataJobKey("wfMain", None, 1)), "subWorkflowId"),
Expand Down Expand Up @@ -391,19 +392,25 @@ class MetadataBuilderActorSpec
class TestReadDatabaseMetadataWorkerActorForCost extends ReadDatabaseMetadataWorkerActor(defaultTimeout, 1000000) {
override def receive: Receive = {
case GetCost(wfId) if wfId == mainWorkflowId =>
sender() ! CostResponse(mainWorkflowId, workflowState, MetadataLookupResponse(mainQuery, mainEvents))
sender() ! CostResponse(mainWorkflowId, workflowRunningState, MetadataLookupResponse(mainQuery, mainEvents))
()
case GetCost(wfId) if wfId == subWorkflow1Id =>
sender() ! CostResponse(subWorkflow1Id, workflowState, MetadataLookupResponse(sub1Query, sub1Events))
sender() ! CostResponse(subWorkflow1Id, workflowSucceededState, MetadataLookupResponse(sub1Query, sub1Events))
()
case GetCost(wfId) if wfId == subWorkflow2Id =>
sender() ! CostResponse(subWorkflow2Id, workflowState, MetadataLookupResponse(sub2Query, sub2Events))
sender() ! CostResponse(subWorkflow2Id, workflowSucceededState, MetadataLookupResponse(sub2Query, sub2Events))
()
case GetCost(wfId) if wfId == subWorkflow1aId =>
sender() ! CostResponse(subWorkflow1aId, workflowState, MetadataLookupResponse(sub1aQuery, sub1aEvents))
sender() ! CostResponse(subWorkflow1aId,
workflowSucceededState,
MetadataLookupResponse(sub1aQuery, sub1aEvents)
)
()
case GetCost(wfId) if wfId == subWorkflow1bId =>
sender() ! CostResponse(subWorkflow1bId, workflowState, MetadataLookupResponse(sub1bQuery, sub1bEvents))
sender() ! CostResponse(subWorkflow1bId,
workflowSucceededState,
MetadataLookupResponse(sub1bQuery, sub1bEvents)
)
()
case _ => ()
}
Expand All @@ -414,7 +421,7 @@ class MetadataBuilderActorSpec
|"cost": 7,
|"currency": "USD",
|"id": "${mainWorkflowId}",
|"status": "${workflowState.toString}",
|"status": "${workflowRunningState.toString}",
|"errors": ["Couldn't find valid vmCostPerHour for call1aA.-1.1"]
|}""".stripMargin

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,17 +35,43 @@ object MetadataBuilderActor {
case object IdleData extends MetadataBuilderActorData
final case class HasWorkData(target: ActorRef, originalRequest: BuildMetadataJsonAction)
extends MetadataBuilderActorData

// Classes extending this trait are used to track state when the actor has launched child
// actors to collect metadata for subworkflows. This class aggregates data as it comes in,
// and builds the complete output when all subworkflow data is present. There's one child
// class for plain metadata queries and one for cost queries.
sealed trait EventsCollectorData extends MetadataBuilderActorData {
val target: ActorRef
val originalRequest: BuildMetadataJsonAction
val originalQuery: MetadataQuery
val originalEvents: Seq[MetadataEvent]
val subWorkflowsMetadata: Map[String, JsValue]
val waitFor: Int

def isComplete = subWorkflowsMetadata.size == waitFor
}

final case class HasReceivedEventsData(target: ActorRef,
originalRequest: BuildMetadataJsonAction,
originalQuery: MetadataQuery,
originalEvents: Seq[MetadataEvent],
subWorkflowsMetadata: Map[String, JsValue],
waitFor: Int
) extends MetadataBuilderActorData {
) extends EventsCollectorData {
def withSubWorkflow(id: String, metadata: JsValue) =
this.copy(subWorkflowsMetadata = subWorkflowsMetadata + ((id, metadata)))
}

def isComplete = subWorkflowsMetadata.size == waitFor
final case class HasReceivedCostEventsData(target: ActorRef,
originalRequest: BuildMetadataJsonAction,
originalQuery: MetadataQuery,
originalEvents: Seq[MetadataEvent],
originalStatus: WorkflowState,
subWorkflowsMetadata: Map[String, JsValue],
waitFor: Int
) extends EventsCollectorData {
def withSubWorkflow(id: String, metadata: JsValue) =
this.copy(subWorkflowsMetadata = subWorkflowsMetadata + ((id, metadata)))
}

def props(readMetadataWorkerMaker: () => Props,
Expand Down Expand Up @@ -432,7 +458,7 @@ class MetadataBuilderActor(readMetadataWorkerMaker: () => Props,
}

when(WaitingForSubWorkflowCost) {
case Event(mbr: MetadataJsonResponse, data: HasReceivedEventsData) =>
case Event(mbr: MetadataJsonResponse, data: HasReceivedCostEventsData) =>
processSubWorkflowCost(mbr, data)
case Event(failure: MetadataServiceFailure, data: HasReceivedEventsData) =>
data.target ! FailedMetadataJsonResponse(data.originalRequest, failure.reason)
Expand Down Expand Up @@ -482,7 +508,7 @@ class MetadataBuilderActor(readMetadataWorkerMaker: () => Props,
failAndDie(new Exception(message), data.target, data.originalRequest)
}

def processSubWorkflowCost(metadataResponse: MetadataJsonResponse, data: HasReceivedEventsData) =
def processSubWorkflowCost(metadataResponse: MetadataJsonResponse, data: HasReceivedCostEventsData) =
metadataResponse match {
case SuccessfulMetadataJsonResponse(GetCost(workflowId), js) =>
val subId: WorkflowId = workflowId
Expand All @@ -491,7 +517,7 @@ class MetadataBuilderActor(readMetadataWorkerMaker: () => Props,
if (newData.isComplete) {
buildCostAndStop(
data.originalQuery.workflowId,
extractFromJsAs[JsString](js, "status").map(_.value).getOrElse(""), // should never be empty
data.originalStatus,
data.originalEvents,
newData.subWorkflowsMetadata,
data.target,
Expand Down Expand Up @@ -579,7 +605,7 @@ class MetadataBuilderActor(readMetadataWorkerMaker: () => Props,

if (subWorkflowIds.isEmpty)
// If no subworkflows found, just build cost data
buildCostAndStop(id, status.toString, metadataResponse.eventList, Map.empty, target, originalRequest)
buildCostAndStop(id, status, metadataResponse.eventList, Map.empty, target, originalRequest)
else {
// Otherwise spin up a metadata builder actor for each sub workflow
subWorkflowIds foreach { subId =>
Expand All @@ -591,18 +617,19 @@ class MetadataBuilderActor(readMetadataWorkerMaker: () => Props,
)
subMetadataBuilder ! GetCost(WorkflowId.fromString(subId))
}
goto(WaitingForSubWorkflowCost) using HasReceivedEventsData(target,
originalRequest,
metadataResponse.query,
metadataResponse.eventList,
Map.empty,
subWorkflowIds.size
goto(WaitingForSubWorkflowCost) using HasReceivedCostEventsData(target,
originalRequest,
metadataResponse.query,
metadataResponse.eventList,
status,
Map.empty,
subWorkflowIds.size
)
}
}

def buildCostAndStop(id: WorkflowId,
status: String,
status: WorkflowState,
eventsList: Seq[MetadataEvent],
expandedValues: Map[String, JsValue],
target: ActorRef,
Expand Down Expand Up @@ -644,7 +671,7 @@ class MetadataBuilderActor(readMetadataWorkerMaker: () => Props,
val resp = JsObject(
Map(
WorkflowMetadataKeys.Id -> JsString(id.toString),
WorkflowMetadataKeys.Status -> JsString(status),
WorkflowMetadataKeys.Status -> JsString(status.toString),
"currency" -> JsString(DefaultCurrency.getCurrencyCode),
"cost" -> JsNumber(callCost + subworkflowCost),
"errors" -> JsArray(costErrors ++ subworkflowErrors)
Expand Down
4 changes: 3 additions & 1 deletion src/ci/bin/test.inc.sh
Original file line number Diff line number Diff line change
Expand Up @@ -543,7 +543,9 @@ cromwell::private::pip_install() {

cromwell::private::upgrade_pip() {
sudo apt-get install -y python3-pip
cromwell::private::pip_install pip --upgrade
# as of ubuntu 23 need to pass --user flag
# https://mail.openvswitch.org/pipermail/ovs-dev/2024-June/414969.html
cromwell::private::pip_install pip --upgrade --user pip
cromwell::private::pip_install requests[security] --ignore-installed
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,6 @@ import common.validation.ErrorOr._
import common.validation.Validation._
import cromwell.cloudsupport.gcp.GoogleConfiguration
import cromwell.core.DockerCredentials
import spray.json.{JsString, JsValue}

/**
* Interface for Authentication information that can be included as a json object in the file uploaded to GCS
* upon workflow creation and used in the VM.
*/
sealed trait PipelinesApiAuthObject {
def context: String
def map: Map[String, JsValue]

def toMap: Map[String, Map[String, JsValue]] = Map(context -> map)
}

object PipelinesApiDockerCredentials {

Expand Down Expand Up @@ -53,10 +41,3 @@ case class PipelinesApiDockerCredentials(override val token: String,
override val keyName: Option[String],
override val authName: Option[String]
) extends DockerCredentials(token = token, keyName = keyName, authName = authName)
with PipelinesApiAuthObject {

override val context = "docker"
override val map = Map(
"token" -> JsString(token)
)
}

0 comments on commit 24e2d6d

Please sign in to comment.