Skip to content

Commit

Permalink
Merge branch 'develop' into gcp-batch-backend-execution-actor-test
Browse files Browse the repository at this point in the history
  • Loading branch information
dspeck1 authored Jan 17, 2025
2 parents 118f6b0 + a00a190 commit 3665e59
Show file tree
Hide file tree
Showing 32 changed files with 291 additions and 94 deletions.
1 change: 1 addition & 0 deletions .github/workflows/chart_update_on_merge.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ jobs:
if: github.event.pull_request.merged == true
runs-on: ubuntu-latest
steps:
- uses: sbt/setup-sbt@v1
- name: Clone Cromwell
uses: actions/checkout@v4
with:
Expand Down
1 change: 1 addition & 0 deletions .github/workflows/cromwell_unit_tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ jobs:

steps:
- uses: actions/checkout@v3 # checkout the cromwell repo
- uses: sbt/setup-sbt@v1
- uses: ./.github/set_up_cromwell_action #Exectute this reusable github action. It will set up java/sbt/git-secrets/cromwell.
with:
cromwell_repo_token: ${{ secrets.BROADBOT_GITHUB_TOKEN }}
Expand Down
1 change: 1 addition & 0 deletions .github/workflows/docker_build_test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ jobs:
repository: broadinstitute/cromwell
token: ${{ secrets.BROADBOT_GITHUB_TOKEN }}
path: cromwell
- uses: sbt/setup-sbt@v1
- uses: actions/setup-java@v4
with:
distribution: 'temurin'
Expand Down
7 changes: 4 additions & 3 deletions .github/workflows/integration_tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -93,9 +93,9 @@ jobs:
friendly_name: Centaur Engine Upgrade Local with MySQL 5.7
- build_type: referenceDiskManifestBuilderApp
friendly_name: Reference Disk Manifest Builder App
- build_type: centaurSlurm
build_mysql: 5.7
friendly_name: "Centaur Slurm with MySQL 5.7"
# - build_type: centaurSlurm
# build_mysql: 5.7
# friendly_name: "Centaur Slurm with MySQL 5.7"
- build_type: centaurBlob
build_mysql: 5.7
friendly_name: Centaur Blob
Expand All @@ -114,6 +114,7 @@ jobs:
runs-on: ubuntu-latest
timeout-minutes: 120
steps:
- uses: sbt/setup-sbt@v1
- uses: actions/checkout@v3 # checkout the cromwell repo
with:
ref: ${{ inputs.target-branch }}
Expand Down
1 change: 1 addition & 0 deletions .github/workflows/scalafmt-check.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ jobs:
runs-on: ubuntu-latest
timeout-minutes: 20
steps:
- uses: sbt/setup-sbt@v1
- uses: actions/checkout@v3
with:
ref: ${{ inputs.target-branch }}
Expand Down
1 change: 1 addition & 0 deletions .github/workflows/trivy.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ jobs:

steps:
- uses: actions/checkout@v2
- uses: sbt/setup-sbt@v1

# fetch SBT package
- uses: actions/setup-java@v4
Expand Down
12 changes: 10 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,17 @@ The `IX_WORKFLOW_STORE_ENTRY_WS` index is removed from `WORKFLOW_STORE_ENTRY`.

The index had low cardinality and workflow pickup is faster without it. Migration time depends on workflow store size, but should be very fast for most installations. Terminal workflows are removed from the workflow store, so only running workflows contribute to the cost.

### Bug fixes and small changes
#### Index additions

* Changed default boot disk size from 10GB to 20GB in PipelinesAPI and Google Batch backends
The `IX_METADATA_ENTRY_WEU_MK` index is added to `METADATA_ENTRY`. In pre-release testing, the migration proceeded at about 3 million rows per minute. Please plan downtime accordingly.

#### Unique constraint addition

The `UC_GROUP_METRICS_ENTRY_GI` constraint has been added to column `GROUP_ID` in `GROUP_METRICS_ENTRY` table. This additionally enforces at database level that the `GROUP_ID` column always contains unique values.

### Reduce errors from boot disk filling up on Google Lifesciences API

* If Cromwell can't determine the size of the user command Docker image, it will increase Lifesciences API boot disk size by 30GB rather than 0. This should reduce incidence of tasks failing due to boot disk filling up.

#### Improved `size()` function performance on arrays

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ files {

metadata {
status: Succeeded
"outputs.docker_size_dockerhub.large_dockerhub_image_with_hash.bootDiskSize": 27
"outputs.docker_size_dockerhub.large_dockerhub_image_with_tag.bootDiskSize": 27
"outputs.docker_size_dockerhub.large_dockerhub_image_with_hash.bootDiskSize": 17
"outputs.docker_size_dockerhub.large_dockerhub_image_with_tag.bootDiskSize": 17
}

workflowType: WDL
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ files {

metadata {
status: Succeeded
"outputs.docker_size_gcr.large_gcr_image_with_hash.bootDiskSize": 27
"outputs.docker_size_gcr.large_gcr_image_with_tag.bootDiskSize": 27
"outputs.docker_size_gcr.large_gcr_image_with_hash.bootDiskSize": 17
"outputs.docker_size_gcr.large_gcr_image_with_tag.bootDiskSize": 17
}

workflowType: WDL
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package cromwell.cloudsupport.azure

import com.azure.core.management.AzureEnvironment
import com.typesafe.config.ConfigFactory
import net.ceedubs.ficus.Ficus._

object AzureConfiguration {
private val conf = ConfigFactory.load().getConfig("azure")
val azureEnvironment =
AzureEnvironmentConverter.fromString(
conf.as[Option[String]]("azure-environment").getOrElse(AzureEnvironmentConverter.Azure)
)
val azureTokenScopeManagement = conf.as[String]("token-scope-management")
}

object AzureEnvironmentConverter {
val Azure: String = "AzureCloud"
val AzureGov: String = "AzureUSGovernmentCloud"
val AzureChina: String = "AzureChinaCloud"

def fromString(s: String): AzureEnvironment = s match {
case AzureGov => AzureEnvironment.AZURE_US_GOVERNMENT
case AzureChina => AzureEnvironment.AZURE_CHINA
// a bit redundant, but I want to have a explicit case for Azure for clarity, even though it's the default
case Azure => AzureEnvironment.AZURE
case _ => AzureEnvironment.AZURE
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package cromwell.cloudsupport.azure

import cats.implicits.catsSyntaxValidatedId
import com.azure.core.credential.TokenRequestContext
import com.azure.core.management.AzureEnvironment
import com.azure.core.management.profile.AzureProfile
import com.azure.identity.DefaultAzureCredentialBuilder
import common.validation.ErrorOr.ErrorOr
Expand All @@ -20,8 +19,8 @@ case object AzureCredentials {

final val tokenAcquisitionTimeout = 5.seconds

val azureProfile = new AzureProfile(AzureEnvironment.AZURE)
val tokenScope = "https://management.azure.com/.default"
val azureProfile = new AzureProfile(AzureConfiguration.azureEnvironment)
val tokenScope = AzureConfiguration.azureTokenScopeManagement

private def tokenRequestContext: TokenRequestContext = {
val trc = new TokenRequestContext()
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package cromwell.cloudsupport.azure

import com.azure.core.management.AzureEnvironment
import com.azure.core.management.profile.AzureProfile
import com.azure.identity.DefaultAzureCredentialBuilder
import com.azure.resourcemanager.AzureResourceManager
Expand Down Expand Up @@ -33,7 +32,7 @@ object AzureUtils {
.map(Success(_))
.getOrElse(Failure(new Exception("Could not parse storage account")))

val azureProfile = new AzureProfile(AzureEnvironment.AZURE)
val azureProfile = new AzureProfile(AzureConfiguration.azureEnvironment)

def azureCredentialBuilder = new DefaultAzureCredentialBuilder()
.authorityHost(azureProfile.getEnvironment.getActiveDirectoryEndpoint)
Expand Down
5 changes: 5 additions & 0 deletions core/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,11 @@ call-caching {
max-failed-copy-attempts = 1000000
}

azure {
azure-environment = "AzureCloud"
token-scope-management = "https://management.azure.com/.default"
}

google {

application-name = "cromwell"
Expand Down
1 change: 1 addition & 0 deletions database/migration/src/main/resources/changelog.xml
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@
<include file="changesets/enlarge_sub_workflow_store_entry_id.xml" relativeToChangelogFile="true" />
<include file="changesets/workflow_store_drop_state_index.xml" relativeToChangelogFile="true" />
<include file="changesets/add_group_metrics_table.xml" relativeToChangelogFile="true" />
<include file="changesets/add_group_metrics_unique_constraint.xml" relativeToChangelogFile="true" />
<!-- WARNING!
This changeset should always be last.
It is always run (and should always run last) to set table ownership correctly.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
<?xml version="1.0" encoding="UTF-8" standalone="no"?>
<databaseChangeLog objectQuotingStrategy="QUOTE_ALL_OBJECTS"
xmlns="http://www.liquibase.org/xml/ns/dbchangelog"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.liquibase.org/xml/ns/dbchangelog http://www.liquibase.org/xml/ns/dbchangelog/dbchangelog-3.3.xsd">

<changeSet id="add_unique_constraint_group_metrics" author="sshah" dbms="mysql,hsqldb,mariadb,postgresql">
<dropIndex
tableName="GROUP_METRICS_ENTRY"
indexName="IX_GROUP_METRICS_ENTRY_GI" />

<addUniqueConstraint
tableName="GROUP_METRICS_ENTRY"
columnNames="GROUP_ID"
constraintName="UC_GROUP_METRICS_ENTRY_GI"/>
</changeSet>
</databaseChangeLog>
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
<?xml version="1.0" encoding="UTF-8" standalone="no"?>
<databaseChangeLog objectQuotingStrategy="QUOTE_ALL_OBJECTS"
xmlns="http://www.liquibase.org/xml/ns/dbchangelog"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.liquibase.org/xml/ns/dbchangelog http://www.liquibase.org/xml/ns/dbchangelog/dbchangelog-3.3.xsd">

<changeSet id="metadata_index_add_workflow_key" author="anichols" dbms="hsqldb,mariadb,mysql,postgresql">
<!--
This index creates at about 3M rows per minute on MySQL.
That would be an impossible multi-day downtime in Terra, so we manually pre-create the index asynchronously.
This changeset detects environments where this has been done and immediately marks itself as applied.
-->
<preConditions onFail="MARK_RAN">
<not>
<indexExists indexName="IX_METADATA_ENTRY_WEU_MK"/>
</not>
</preConditions>
<createIndex indexName="IX_METADATA_ENTRY_WEU_MK" tableName="METADATA_ENTRY">
<column name="WORKFLOW_EXECUTION_UUID"/>
<column name="METADATA_KEY"/>
</createIndex>
</changeSet>
</databaseChangeLog>
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
<include file="metadata_changesets/remove_non_summarizable_metadata_from_queue.xml" relativeToChangelogFile="true" />
<include file="metadata_changesets/update_metadata_archive_index.xml" relativeToChangelogFile="true" />
<include file="metadata_changesets/reset_archive_statuses_to_null.xml" relativeToChangelogFile="true" />
<include file="metadata_changesets/metadata_index_add_workflow_key.xml" relativeToChangelogFile="true" />
<!-- WARNING!
This changeset should always be last.
It it always run (and should always run last) to set table ownership correctly.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@ package cromwell.database.slick

import cromwell.database.sql.GroupMetricsSqlDatabase
import cromwell.database.sql.tables.GroupMetricsEntry
import org.postgresql.util.PSQLException
import slick.jdbc.TransactionIsolation

import java.sql.Timestamp
import java.sql.{SQLIntegrityConstraintViolationException, Timestamp}
import scala.concurrent.{ExecutionContext, Future}

trait GroupMetricsSlickDatabase extends GroupMetricsSqlDatabase {
Expand All @@ -14,16 +16,47 @@ trait GroupMetricsSlickDatabase extends GroupMetricsSqlDatabase {
override def recordGroupMetricsEntry(
groupMetricsEntry: GroupMetricsEntry
)(implicit ec: ExecutionContext): Future[Unit] = {
val action = for {
updateCount <- dataAccess
.quotaExhaustionForGroupId(groupMetricsEntry.groupId)
.update(groupMetricsEntry.quotaExhaustionDetected)
_ <- updateCount match {
case 0 => dataAccess.groupMetricsEntryIdsAutoInc += groupMetricsEntry
case _ => assertUpdateCount("recordGroupMetricsEntry", updateCount, 1)
}
def updateGroupMetricsEntry(): Future[Unit] = {
val updateAction = for {
_ <- dataAccess
.quotaExhaustionForGroupId(groupMetricsEntry.groupId)
.update(groupMetricsEntry.quotaExhaustionDetected)
} yield ()

// The transaction level is set to 'ReadCommitted' to avoid Postgres returning error `could not serialize
// access due to concurrent update` which happens in 'RepeatableRead' or higher isolation levels.
// See https://stackoverflow.com/questions/50797097/postgres-could-not-serialize-access-due-to-concurrent-update
runTransaction(updateAction, TransactionIsolation.ReadCommitted)
}

/*
The approach here is to try and insert the record into the table and if that fails because a record with that
group_id already exists, then it will update that record with new quota_exhaustion_detected timestamp.
The insert and update happen in 2 different transactions. Using 2 separate transactions avoids deadlocks that
occur during upserts or when using Slick's insertOrUpdate in a single transaction. Deadlocks occur in scenarios
with concurrent threads trying to update the table, even under the stricter Serializable transaction isolation
level. This is caused by a gap lock on the IX_GROUP_METRICS_ENTRY_GI index, which locks gaps between index
records on a page, leading to deadlocks for transactions involving the same or nearby group_id values.
See https://broadworkbench.atlassian.net/browse/AN-286 and https://broadworkbench.atlassian.net/browse/WX-1847.
This approach should have minimal performance impact since the table has only 3 columns and low cardinality.
Note: a unique constraint on group_id also exists in database.
*/
val insertAction = for {
_ <- dataAccess.groupMetricsEntryIdsAutoInc += groupMetricsEntry
} yield ()
runTransaction(action)

runTransaction(insertAction)
.recoverWith {
case ex
if ex.isInstanceOf[SQLIntegrityConstraintViolationException] || (ex
.isInstanceOf[PSQLException] && ex.getMessage.contains(
"duplicate key value violates unique constraint \"UC_GROUP_METRICS_ENTRY_GI\""
)) =>
updateGroupMetricsEntry()
case e => Future.failed(e)
}
}

override def countGroupMetricsEntries(groupId: String)(implicit ec: ExecutionContext): Future[Int] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ trait GroupMetricsEntryComponent {
groupMetricsEntryId.?
) <> ((GroupMetricsEntry.apply _).tupled, GroupMetricsEntry.unapply)

def ixGroupMetricsEntryGi = index("IX_GROUP_METRICS_ENTRY_GI", groupId, unique = false)
def ucGroupMetricsEntryGi = index("UC_GROUP_METRICS_ENTRY_GI", groupId, unique = true)
}

protected val groupMetricsEntries = TableQuery[GroupMetricsEntries]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,23 @@ trait MetadataEntryComponent {

// TODO: rename index via liquibase
def ixMetadataEntryWeu = index("METADATA_WORKFLOW_IDX", workflowExecutionUuid, unique = false)

/**
* Index designed to accelerate common key-specific queries across an entire workflow, such as:
* - Get workflow-level `outputs%` keys (no tasks, requireEmptyJobKey = true)
* - Get all `vmStartTime%`, `vmEndTime%`, `vmCostPerHour%` keys in the workflow (include tasks, requireEmptyJobKey = false)
*
* It is NOT good, as in may make actively slower, queries that reference a specific job. If we do more
* with getting metadata for individual jobs, recommend creating this index with all 5 columns:
* - WORKFLOW_EXECUTION_UUID, CALL_FQN, JOB_SCATTER_INDEX, JOB_RETRY_ATTEMPT, METADATA_KEY
*
* Do NOT recommend this alternate order, as wildcards in the middle are inefficient and this can be
* slower than no indexes. Tested with 20M row `69e8259c` workflow in October 2024.
* - WORKFLOW_EXECUTION_UUID, METADATA_KEY, CALL_FQN, JOB_SCATTER_INDEX, JOB_RETRY_ATTEMPT
*
* @return A reference to the index
*/
def ixMetadataEntryWeuMk = index("IX_METADATA_ENTRY_WEU_MK", (workflowExecutionUuid, metadataKey), unique = false)
}

val metadataEntries = TableQuery[MetadataEntries]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,8 @@ class MetadataBuilderActorSpec
val subWorkflow1aId = WorkflowId(UUID.fromString("1a1a1a1a-f76d-4af3-b371-5ba580916729"))
val subWorkflow1bId = WorkflowId(UUID.fromString("1b1b1b1b-f76d-4af3-b371-5ba580916729"))

val workflowState = WorkflowSucceeded
val workflowSucceededState = WorkflowSucceeded
val workflowRunningState = WorkflowRunning

val mainEvents = List(
MetadataEvent(MetadataKey(mainWorkflowId, Option(MetadataJobKey("wfMain", None, 1)), "subWorkflowId"),
Expand Down Expand Up @@ -391,19 +392,25 @@ class MetadataBuilderActorSpec
class TestReadDatabaseMetadataWorkerActorForCost extends ReadDatabaseMetadataWorkerActor(defaultTimeout, 1000000) {
override def receive: Receive = {
case GetCost(wfId) if wfId == mainWorkflowId =>
sender() ! CostResponse(mainWorkflowId, workflowState, MetadataLookupResponse(mainQuery, mainEvents))
sender() ! CostResponse(mainWorkflowId, workflowRunningState, MetadataLookupResponse(mainQuery, mainEvents))
()
case GetCost(wfId) if wfId == subWorkflow1Id =>
sender() ! CostResponse(subWorkflow1Id, workflowState, MetadataLookupResponse(sub1Query, sub1Events))
sender() ! CostResponse(subWorkflow1Id, workflowSucceededState, MetadataLookupResponse(sub1Query, sub1Events))
()
case GetCost(wfId) if wfId == subWorkflow2Id =>
sender() ! CostResponse(subWorkflow2Id, workflowState, MetadataLookupResponse(sub2Query, sub2Events))
sender() ! CostResponse(subWorkflow2Id, workflowSucceededState, MetadataLookupResponse(sub2Query, sub2Events))
()
case GetCost(wfId) if wfId == subWorkflow1aId =>
sender() ! CostResponse(subWorkflow1aId, workflowState, MetadataLookupResponse(sub1aQuery, sub1aEvents))
sender() ! CostResponse(subWorkflow1aId,
workflowSucceededState,
MetadataLookupResponse(sub1aQuery, sub1aEvents)
)
()
case GetCost(wfId) if wfId == subWorkflow1bId =>
sender() ! CostResponse(subWorkflow1bId, workflowState, MetadataLookupResponse(sub1bQuery, sub1bEvents))
sender() ! CostResponse(subWorkflow1bId,
workflowSucceededState,
MetadataLookupResponse(sub1bQuery, sub1bEvents)
)
()
case _ => ()
}
Expand All @@ -414,7 +421,7 @@ class MetadataBuilderActorSpec
|"cost": 7,
|"currency": "USD",
|"id": "${mainWorkflowId}",
|"status": "${workflowState.toString}",
|"status": "${workflowRunningState.toString}",
|"errors": ["Couldn't find valid vmCostPerHour for call1aA.-1.1"]
|}""".stripMargin

Expand Down
Loading

0 comments on commit 3665e59

Please sign in to comment.