diff --git a/orca-core-tck/src/main/groovy/com/netflix/spinnaker/orca/pipeline/persistence/PipelineExecutionRepositoryTck.groovy b/orca-core-tck/src/main/groovy/com/netflix/spinnaker/orca/pipeline/persistence/PipelineExecutionRepositoryTck.groovy index c9b482e85c..75f2e5b020 100644 --- a/orca-core-tck/src/main/groovy/com/netflix/spinnaker/orca/pipeline/persistence/PipelineExecutionRepositoryTck.groovy +++ b/orca-core-tck/src/main/groovy/com/netflix/spinnaker/orca/pipeline/persistence/PipelineExecutionRepositoryTck.groovy @@ -149,6 +149,8 @@ abstract class PipelineExecutionRepositoryTck ext buildTime = 0 trigger = new DefaultTrigger("manual") } + // our ULID implementation isn't monotonic + sleep(5) def succeededExecution = orchestration { status = SUCCEEDED buildTime = 0 @@ -157,8 +159,6 @@ abstract class PipelineExecutionRepositoryTck ext when: repository.store(runningExecution) - // our ULID implementation isn't monotonic - sleep(5) repository.store(succeededExecution) def orchestrations = repository.retrieveOrchestrationsForApplication( runningExecution.application, diff --git a/orca-peering/src/main/kotlin/com/netflix/spinnaker/orca/peering/ExecutionCopier.kt b/orca-peering/src/main/kotlin/com/netflix/spinnaker/orca/peering/ExecutionCopier.kt index dc332a6213..351817ee28 100644 --- a/orca-peering/src/main/kotlin/com/netflix/spinnaker/orca/peering/ExecutionCopier.kt +++ b/orca-peering/src/main/kotlin/com/netflix/spinnaker/orca/peering/ExecutionCopier.kt @@ -160,7 +160,7 @@ open class ExecutionCopier( // Step 2: Copy all executions executionRows.forEach { r -> - r.set(DSL.field("partition"), peeredId) + r.set(DSL.field(DSL.name("partition")), peeredId) latestUpdatedAt = max(latestUpdatedAt, r.get("updated_at", Long::class.java)) } destDB.loadRecords(getExecutionTable(executionType).name, executionRows) diff --git a/orca-sql-postgres/orca-sql-postgres.gradle b/orca-sql-postgres/orca-sql-postgres.gradle new file mode 100644 index 0000000000..e2ff91272c --- /dev/null +++ b/orca-sql-postgres/orca-sql-postgres.gradle @@ -0,0 +1,4 @@ +dependencies { + implementation(project(":orca-sql")) + runtimeOnly("org.postgresql:postgresql") +} diff --git a/orca-sql/orca-sql.gradle b/orca-sql/orca-sql.gradle index eba5e0de73..842acf1e9a 100644 --- a/orca-sql/orca-sql.gradle +++ b/orca-sql/orca-sql.gradle @@ -45,8 +45,10 @@ dependencies { testImplementation("dev.minutest:minutest") testImplementation("com.nhaarman:mockito-kotlin") testImplementation("org.testcontainers:mysql") + testImplementation("org.testcontainers:postgresql") testRuntimeOnly("mysql:mysql-connector-java") + testRuntimeOnly("org.postgresql:postgresql") testRuntimeOnly("org.junit.jupiter:junit-jupiter-engine") testRuntimeOnly("org.junit.platform:junit-platform-launcher") testRuntimeOnly("org.junit.vintage:junit-vintage-engine") diff --git a/orca-sql/src/main/kotlin/com/netflix/spinnaker/orca/notifications/SqlNotificationClusterLock.kt b/orca-sql/src/main/kotlin/com/netflix/spinnaker/orca/notifications/SqlNotificationClusterLock.kt index 155c8ad03f..7446d810f3 100644 --- a/orca-sql/src/main/kotlin/com/netflix/spinnaker/orca/notifications/SqlNotificationClusterLock.kt +++ b/orca-sql/src/main/kotlin/com/netflix/spinnaker/orca/notifications/SqlNotificationClusterLock.kt @@ -23,6 +23,7 @@ import io.vavr.control.Try import java.time.Clock import java.time.Duration import org.jooq.DSLContext +import org.jooq.SQLDialect import org.jooq.exception.SQLDialectNotSupportedException import org.jooq.impl.DSL import org.slf4j.LoggerFactory @@ -52,8 +53,19 @@ class SqlNotificationClusterLock( jooq.insertInto(lockTable) .set(lockField, notificationType) .set(expiryField, now.plusSeconds(lockTimeoutSeconds).toEpochMilli()) - .onDuplicateKeyIgnore() - .execute() + .run { + when (jooq.dialect()) { + SQLDialect.POSTGRES -> { + onConflict(DSL.field("lock_name")) + .doNothing() + .execute() + } + else -> { + onDuplicateKeyIgnore() + .execute() + } + } + } } if (changed == 0) { diff --git a/orca-sql/src/main/kotlin/com/netflix/spinnaker/orca/sql/cleanup/OldPipelineCleanupPollingNotificationAgent.kt b/orca-sql/src/main/kotlin/com/netflix/spinnaker/orca/sql/cleanup/OldPipelineCleanupPollingNotificationAgent.kt index f690943325..90b744e8d6 100644 --- a/orca-sql/src/main/kotlin/com/netflix/spinnaker/orca/sql/cleanup/OldPipelineCleanupPollingNotificationAgent.kt +++ b/orca-sql/src/main/kotlin/com/netflix/spinnaker/orca/sql/cleanup/OldPipelineCleanupPollingNotificationAgent.kt @@ -29,6 +29,7 @@ import java.util.concurrent.atomic.AtomicInteger import org.jooq.DSLContext import org.jooq.impl.DSL.count import org.jooq.impl.DSL.field +import org.jooq.impl.DSL.name import org.jooq.impl.DSL.table import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression import org.springframework.boot.context.properties.EnableConfigurationProperties @@ -84,7 +85,7 @@ class OldPipelineCleanupPollingNotificationAgent( if (orcaSqlProperties.partitionName != null) { queryBuilder = queryBuilder .and( - field("`partition`").eq(orcaSqlProperties.partitionName)) + field(name("partition")).eq(orcaSqlProperties.partitionName)) } val pipelineConfigsWithOldExecutions = queryBuilder @@ -145,7 +146,7 @@ class OldPipelineCleanupPollingNotificationAgent( if (orcaSqlProperties.partitionName != null) { queryBuilder = queryBuilder .and( - field("`partition`").eq(orcaSqlProperties.partitionName)) + field(name("partition")).eq(orcaSqlProperties.partitionName)) } val executionsToRemove = queryBuilder diff --git a/orca-sql/src/main/kotlin/com/netflix/spinnaker/orca/sql/cleanup/TopApplicationExecutionCleanupPollingNotificationAgent.kt b/orca-sql/src/main/kotlin/com/netflix/spinnaker/orca/sql/cleanup/TopApplicationExecutionCleanupPollingNotificationAgent.kt index 97e16f7858..46a93ce420 100644 --- a/orca-sql/src/main/kotlin/com/netflix/spinnaker/orca/sql/cleanup/TopApplicationExecutionCleanupPollingNotificationAgent.kt +++ b/orca-sql/src/main/kotlin/com/netflix/spinnaker/orca/sql/cleanup/TopApplicationExecutionCleanupPollingNotificationAgent.kt @@ -25,6 +25,7 @@ import com.netflix.spinnaker.orca.pipeline.persistence.ExecutionRepository import java.util.concurrent.atomic.AtomicInteger import org.jooq.DSLContext import org.jooq.impl.DSL +import org.jooq.impl.DSL.name import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression import org.springframework.boot.context.properties.EnableConfigurationProperties import org.springframework.stereotype.Component @@ -62,7 +63,7 @@ class TopApplicationExecutionCleanupPollingNotificationAgent( .where(if (orcaSqlProperties.partitionName == null) { DSL.noCondition() } else { - DSL.field("`partition`").eq(orcaSqlProperties.partitionName) + DSL.field(name("partition")).eq(orcaSqlProperties.partitionName) }) .and(DSL.field("application").`in`(*chunk.toTypedArray())) .groupBy(DSL.field("application")) diff --git a/orca-sql/src/main/kotlin/com/netflix/spinnaker/orca/sql/pipeline/persistence/ExecutionMapper.kt b/orca-sql/src/main/kotlin/com/netflix/spinnaker/orca/sql/pipeline/persistence/ExecutionMapper.kt index 7e32820964..47faff1437 100644 --- a/orca-sql/src/main/kotlin/com/netflix/spinnaker/orca/sql/pipeline/persistence/ExecutionMapper.kt +++ b/orca-sql/src/main/kotlin/com/netflix/spinnaker/orca/sql/pipeline/persistence/ExecutionMapper.kt @@ -45,7 +45,7 @@ class ExecutionMapper( mapper.readValue(rs.getString("body")) .also { execution -> results.add(execution) - execution.partition = rs.getString("`partition`") + execution.partition = rs.getString("partition") if (rs.getString("id") != execution.id) { // Map legacyId executions to their current ULID diff --git a/orca-sql/src/main/kotlin/com/netflix/spinnaker/orca/sql/pipeline/persistence/SqlExecutionRepository.kt b/orca-sql/src/main/kotlin/com/netflix/spinnaker/orca/sql/pipeline/persistence/SqlExecutionRepository.kt index 95472b9dee..0fbe298ce0 100644 --- a/orca-sql/src/main/kotlin/com/netflix/spinnaker/orca/sql/pipeline/persistence/SqlExecutionRepository.kt +++ b/orca-sql/src/main/kotlin/com/netflix/spinnaker/orca/sql/pipeline/persistence/SqlExecutionRepository.kt @@ -54,6 +54,7 @@ import org.jooq.DSLContext import org.jooq.DatePart import org.jooq.Field import org.jooq.Record +import org.jooq.SQLDialect import org.jooq.SelectConditionStep import org.jooq.SelectConnectByStep import org.jooq.SelectForUpdateStep @@ -601,7 +602,7 @@ class SqlExecutionRepository( override fun countActiveExecutions(): ActiveExecutionsReport { withPool(poolName) { - val partitionPredicate = if (partitionName != null) field("`partition`").eq(partitionName) else value(1).eq(value(1)) + val partitionPredicate = if (partitionName != null) field(name("partition")).eq(partitionName) else value(1).eq(value(1)) val orchestrationsQuery = jooq.selectCount() .from(ORCHESTRATION.tableName) @@ -748,29 +749,35 @@ class SqlExecutionRepository( val (executionId, legacyId) = mapLegacyId(ctx, tableName, execution.id, execution.startTime) - val insertPairs = mapOf( + val insertPairs = mutableMapOf( field("id") to executionId, field("legacy_id") to legacyId, field(name("partition")) to partitionName, field("status") to status, field("application") to execution.application, field("build_time") to (execution.buildTime ?: currentTimeMillis()), - field("start_time") to execution.startTime, field("canceled") to execution.isCanceled, field("updated_at") to currentTimeMillis(), field("body") to body ) - val updatePairs = mapOf( + val updatePairs = mutableMapOf( field("status") to status, field("body") to body, field(name("partition")) to partitionName, // won't have started on insert - field("start_time") to execution.startTime, field("canceled") to execution.isCanceled, field("updated_at") to currentTimeMillis() ) + // Set startTime only if it is not null + // jooq has some issues casting nulls when updating in the Postgres dialect + val startTime = execution.startTime + if (startTime != null) { + insertPairs[field("start_time")] = startTime + updatePairs[field("start_time")] = startTime + } + when (execution.type) { PIPELINE -> upsert( ctx, @@ -881,9 +888,21 @@ class SqlExecutionRepository( try { ctx.insertInto(table, *insertPairs.keys.toTypedArray()) .values(insertPairs.values) - .onDuplicateKeyUpdate() - .set(updatePairs) - .execute() + .run { + when (jooq.dialect()) { + SQLDialect.POSTGRES -> { + onConflict(DSL.field("id")) + .doUpdate() + .set(updatePairs) + .execute() + } + else -> { + onDuplicateKeyUpdate() + .set(updatePairs) + .execute() + } + } + } } catch (e: SQLDialectNotSupportedException) { log.debug("Falling back to primitive upsert logic: ${e.message}") val exists = ctx.fetchExists(ctx.select().from(table).where(field("id").eq(updateId)).forUpdate()) @@ -990,7 +1009,7 @@ class SqlExecutionRepository( seek: (SelectConnectByStep) -> SelectForUpdateStep ) = select(fields) - .from(type.tableName.forceIndex(usingIndex)) + .from(if (jooq.dialect() == SQLDialect.MYSQL) type.tableName.forceIndex(usingIndex) else type.tableName) .let { conditions(it) } .let { seek(it) } @@ -999,7 +1018,7 @@ class SqlExecutionRepository( .from(type.tableName) private fun selectFields() = - listOf(field("id"), field("body"), field("`partition`")) + listOf(field("id"), field("body"), field(name("partition"))) private fun SelectForUpdateStep.fetchExecutions() = ExecutionMapper(mapper, stageReadSize).map(fetch().intoResultSet(), jooq) diff --git a/orca-sql/src/main/resources/db/changelog/20190911-notification-cluster-lock.yml b/orca-sql/src/main/resources/db/changelog/20190911-notification-cluster-lock.yml index c88c74e49a..c7679e6ee5 100644 --- a/orca-sql/src/main/resources/db/changelog/20190911-notification-cluster-lock.yml +++ b/orca-sql/src/main/resources/db/changelog/20190911-notification-cluster-lock.yml @@ -2,6 +2,9 @@ databaseChangeLog: - changeSet: id: create-notification-lock-table author: afeldman + validCheckSum: + # Original changeset checksum + - 8:4f24acc265a458e399dc36f607a74f90 changes: - createTable: tableName: notification_lock @@ -14,6 +17,6 @@ databaseChangeLog: nullable: false - column: name: expiry - type: bigint(13) + type: bigint constraints: nullable: false diff --git a/orca-sql/src/main/resources/db/changelog/20200327-deleted-executions-table.yml b/orca-sql/src/main/resources/db/changelog/20200327-deleted-executions-table.yml index 9e02a284cd..ada75d587c 100644 --- a/orca-sql/src/main/resources/db/changelog/20200327-deleted-executions-table.yml +++ b/orca-sql/src/main/resources/db/changelog/20200327-deleted-executions-table.yml @@ -2,10 +2,38 @@ databaseChangeLog: - changeSet: id: create-deleted-executions-table author: mvulfson + validCheckSum: + # Original changeset checksum + - 8:847dd443c55dcb0a47a9576b891ad5ef changes: + - createTable: + tableName: deleted_executions + columns: + - column: + name: id + type: int + autoIncrement: true + constraints: + primaryKey: true + nullable: false + - column: + name: execution_id + type: varchar(26) + constraints: + nullable: false + - column: + name: deleted_at + type: datetime + constraints: + nullable: false + - column: + name: execution_type + type: varchar(26) + constraints: + nullable: false - sql: dbms: mysql - sql: CREATE TABLE `deleted_executions` (id INT AUTO_INCREMENT, execution_id char(26) NOT NULL, execution_type ENUM("pipeline", "orchestration") NOT NULL, deleted_at DATETIME NOT NULL, CONSTRAINT deleted_executions_pk PRIMARY KEY (id)) + sql: ALTER TABLE `deleted_executions` MODIFY COLUMN `execution_type` ENUM("pipeline", "orchestration") NOT NULL rollback: - dropTable: tableName: deleted_executions diff --git a/orca-sql/src/test/groovy/com/netflix/spinnaker/orca/sql/cleanup/OldPipelineCleanupPollingNotificationAgentSpec.groovy b/orca-sql/src/test/groovy/com/netflix/spinnaker/orca/sql/cleanup/OldPipelineCleanupPollingNotificationAgentSpec.groovy index 8e6c32621b..09b42a1a2d 100644 --- a/orca-sql/src/test/groovy/com/netflix/spinnaker/orca/sql/cleanup/OldPipelineCleanupPollingNotificationAgentSpec.groovy +++ b/orca-sql/src/test/groovy/com/netflix/spinnaker/orca/sql/cleanup/OldPipelineCleanupPollingNotificationAgentSpec.groovy @@ -38,6 +38,7 @@ import spock.lang.Shared import spock.lang.Specification import static com.netflix.spinnaker.kork.sql.test.SqlTestUtil.cleanupDb +import static com.netflix.spinnaker.kork.sql.test.SqlTestUtil.initTcPostgresDatabase import static com.netflix.spinnaker.orca.api.pipeline.models.ExecutionType.PIPELINE import static com.netflix.spinnaker.kork.sql.test.SqlTestUtil.initTcMysqlDatabase import static java.time.temporal.ChronoUnit.DAYS @@ -144,3 +145,10 @@ class OldPipelineCleanupPollingNotificationAgentSpec extends Specification { return e } } + +class PgOldPipelineCleanupPollingNotificationAgentSpec extends OldPipelineCleanupPollingNotificationAgentSpec { + def setupSpec() { + currentDatabase = initTcPostgresDatabase() + executionRepository = new SqlExecutionRepository("test", currentDatabase.context, mapper, new RetryProperties(), 10, 100, "poolName", null) + } +} diff --git a/orca-sql/src/test/groovy/com/netflix/spinnaker/orca/sql/pipeline/persistence/SqlPipelineExecutionRepositorySpec.groovy b/orca-sql/src/test/groovy/com/netflix/spinnaker/orca/sql/pipeline/persistence/SqlPipelineExecutionRepositorySpec.groovy index 3a69625b87..c87ff255df 100644 --- a/orca-sql/src/test/groovy/com/netflix/spinnaker/orca/sql/pipeline/persistence/SqlPipelineExecutionRepositorySpec.groovy +++ b/orca-sql/src/test/groovy/com/netflix/spinnaker/orca/sql/pipeline/persistence/SqlPipelineExecutionRepositorySpec.groovy @@ -40,15 +40,15 @@ import spock.lang.AutoCleanup import spock.lang.Shared import spock.lang.Unroll -import static com.netflix.spinnaker.kork.sql.test.SqlTestUtil.initPreviousTcMysqlDatabase +import static com.netflix.spinnaker.kork.sql.test.SqlTestUtil.cleanupDb +import static com.netflix.spinnaker.kork.sql.test.SqlTestUtil.initDualTcMysqlDatabases +import static com.netflix.spinnaker.kork.sql.test.SqlTestUtil.initDualTcPostgresDatabases import static com.netflix.spinnaker.orca.api.pipeline.models.ExecutionType.PIPELINE import static com.netflix.spinnaker.orca.pipeline.persistence.ExecutionRepository.ExecutionComparator.BUILD_TIME_ASC import static com.netflix.spinnaker.orca.pipeline.persistence.ExecutionRepository.ExecutionComparator.BUILD_TIME_DESC import static com.netflix.spinnaker.orca.pipeline.persistence.ExecutionRepository.ExecutionCriteria import static com.netflix.spinnaker.orca.test.model.ExecutionBuilder.orchestration import static com.netflix.spinnaker.orca.test.model.ExecutionBuilder.pipeline -import static com.netflix.spinnaker.kork.sql.test.SqlTestUtil.cleanupDb -import static com.netflix.spinnaker.kork.sql.test.SqlTestUtil.initTcMysqlDatabase class SqlPipelineExecutionRepositorySpec extends PipelineExecutionRepositoryTck { @@ -69,13 +69,12 @@ class SqlPipelineExecutionRepositorySpec extends PipelineExecutionRepositoryTck< TestDatabase previousDatabase def setupSpec() { - currentDatabase = initTcMysqlDatabase() - previousDatabase = initPreviousTcMysqlDatabase() + currentDatabase = initDualTcMysqlDatabases() } def cleanup() { cleanupDb(currentDatabase.context) - cleanupDb(previousDatabase.context) + cleanupDb(currentDatabase.previousContext) } @Override @@ -85,7 +84,7 @@ class SqlPipelineExecutionRepositorySpec extends PipelineExecutionRepositoryTck< @Override ExecutionRepository createExecutionRepositoryPrevious() { - new SqlExecutionRepository("test", previousDatabase.context, mapper, new RetryProperties(), 10, 100, "poolName", null) + new SqlExecutionRepository("test", currentDatabase.previousContext, mapper, new RetryProperties(), 10, 100, "poolName", null) } ExecutionRepository createExecutionRepository(String partition, Interlink interlink = null) { @@ -154,7 +153,7 @@ class SqlPipelineExecutionRepositorySpec extends PipelineExecutionRepositoryTck< currentDatabase.context .update(DSL.table("pipelines")) - .set(DSL.field("`partition`"), DSL.value("foreign")) + .set(DSL.field(DSL.name("partition")), DSL.value("foreign")) .execute() when: @@ -200,7 +199,7 @@ class SqlPipelineExecutionRepositorySpec extends PipelineExecutionRepositoryTck< currentDatabase.context .update(DSL.table("pipelines")) - .set(DSL.field("`partition`"), DSL.value("foreign")) + .set(DSL.field(DSL.name("partition")), DSL.value("foreign")) .execute() when: @@ -667,3 +666,9 @@ class SqlPipelineExecutionRepositorySpec extends PipelineExecutionRepositoryTck< ).size() == 0 } } + +class PgSqlPipelineExecutionRepositorySpec extends SqlPipelineExecutionRepositorySpec { + def setupSpec() { + currentDatabase = initDualTcPostgresDatabases() + } +} diff --git a/orca-web/orca-web.gradle b/orca-web/orca-web.gradle index 404fbdbc15..3865436103 100644 --- a/orca-web/orca-web.gradle +++ b/orca-web/orca-web.gradle @@ -72,6 +72,7 @@ dependencies { if (!rootProject.hasProperty("excludeSqlDrivers")) { runtimeOnly(project(":orca-sql-mysql")) + runtimeOnly(project(":orca-sql-postgres")) } compileOnly("org.projectlombok:lombok") diff --git a/settings.gradle b/settings.gradle index 2d8def2ec8..3a2efd5208 100644 --- a/settings.gradle +++ b/settings.gradle @@ -56,6 +56,7 @@ include "orca-api", "orca-redis", "orca-retrofit", "orca-sql-mysql", + "orca-sql-postgres", "orca-sql", "orca-test-groovy", "orca-test-kotlin",