Skip to content

Commit

Permalink
fix(sql): PostgreSQL compatibility for executions and partitions (spi…
Browse files Browse the repository at this point in the history
…nnaker#3812)

* fix(sql): PostgreSQL compatibility for executions and partitions

Includes the following changes:

- Removing bigint display qualifiers - these are MySQL specific, and display-only
- Avoid using `forceIndex` on PostgreSQL - it does not support that feature
- Remove MySQL-specific column backticks for `partition`, and replace with `DSL.name` throughout
- A number of queries used MySQL-specific upsert features. These have been expanded with PostgreSQL-compatible varieties.
- Add new PostgreSQL module with runtimeOnly driver dependency
- Add PostgreSQL tests for orca-sql

* Update kork/keiko pins to include required dependency changes

Co-authored-by: Adam Jordens <adam@jordens.org>
Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
  • Loading branch information
3 people authored Jul 16, 2020
1 parent 92883d5 commit 0bb15f3
Show file tree
Hide file tree
Showing 15 changed files with 115 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,8 @@ abstract class PipelineExecutionRepositoryTck<T extends ExecutionRepository> ext
buildTime = 0
trigger = new DefaultTrigger("manual")
}
// our ULID implementation isn't monotonic
sleep(5)
def succeededExecution = orchestration {
status = SUCCEEDED
buildTime = 0
Expand All @@ -157,8 +159,6 @@ abstract class PipelineExecutionRepositoryTck<T extends ExecutionRepository> ext

when:
repository.store(runningExecution)
// our ULID implementation isn't monotonic
sleep(5)
repository.store(succeededExecution)
def orchestrations = repository.retrieveOrchestrationsForApplication(
runningExecution.application,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ open class ExecutionCopier(

// Step 2: Copy all executions
executionRows.forEach { r ->
r.set(DSL.field("partition"), peeredId)
r.set(DSL.field(DSL.name("partition")), peeredId)
latestUpdatedAt = max(latestUpdatedAt, r.get("updated_at", Long::class.java))
}
destDB.loadRecords(getExecutionTable(executionType).name, executionRows)
Expand Down
4 changes: 4 additions & 0 deletions orca-sql-postgres/orca-sql-postgres.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
dependencies {
implementation(project(":orca-sql"))
runtimeOnly("org.postgresql:postgresql")
}
2 changes: 2 additions & 0 deletions orca-sql/orca-sql.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,10 @@ dependencies {
testImplementation("dev.minutest:minutest")
testImplementation("com.nhaarman:mockito-kotlin")
testImplementation("org.testcontainers:mysql")
testImplementation("org.testcontainers:postgresql")

testRuntimeOnly("mysql:mysql-connector-java")
testRuntimeOnly("org.postgresql:postgresql")
testRuntimeOnly("org.junit.jupiter:junit-jupiter-engine")
testRuntimeOnly("org.junit.platform:junit-platform-launcher")
testRuntimeOnly("org.junit.vintage:junit-vintage-engine")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import io.vavr.control.Try
import java.time.Clock
import java.time.Duration
import org.jooq.DSLContext
import org.jooq.SQLDialect
import org.jooq.exception.SQLDialectNotSupportedException
import org.jooq.impl.DSL
import org.slf4j.LoggerFactory
Expand Down Expand Up @@ -52,8 +53,19 @@ class SqlNotificationClusterLock(
jooq.insertInto(lockTable)
.set(lockField, notificationType)
.set(expiryField, now.plusSeconds(lockTimeoutSeconds).toEpochMilli())
.onDuplicateKeyIgnore()
.execute()
.run {
when (jooq.dialect()) {
SQLDialect.POSTGRES -> {
onConflict(DSL.field("lock_name"))
.doNothing()
.execute()
}
else -> {
onDuplicateKeyIgnore()
.execute()
}
}
}
}

if (changed == 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import java.util.concurrent.atomic.AtomicInteger
import org.jooq.DSLContext
import org.jooq.impl.DSL.count
import org.jooq.impl.DSL.field
import org.jooq.impl.DSL.name
import org.jooq.impl.DSL.table
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression
import org.springframework.boot.context.properties.EnableConfigurationProperties
Expand Down Expand Up @@ -84,7 +85,7 @@ class OldPipelineCleanupPollingNotificationAgent(
if (orcaSqlProperties.partitionName != null) {
queryBuilder = queryBuilder
.and(
field("`partition`").eq(orcaSqlProperties.partitionName))
field(name("partition")).eq(orcaSqlProperties.partitionName))
}

val pipelineConfigsWithOldExecutions = queryBuilder
Expand Down Expand Up @@ -145,7 +146,7 @@ class OldPipelineCleanupPollingNotificationAgent(
if (orcaSqlProperties.partitionName != null) {
queryBuilder = queryBuilder
.and(
field("`partition`").eq(orcaSqlProperties.partitionName))
field(name("partition")).eq(orcaSqlProperties.partitionName))
}

val executionsToRemove = queryBuilder
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import com.netflix.spinnaker.orca.pipeline.persistence.ExecutionRepository
import java.util.concurrent.atomic.AtomicInteger
import org.jooq.DSLContext
import org.jooq.impl.DSL
import org.jooq.impl.DSL.name
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression
import org.springframework.boot.context.properties.EnableConfigurationProperties
import org.springframework.stereotype.Component
Expand Down Expand Up @@ -62,7 +63,7 @@ class TopApplicationExecutionCleanupPollingNotificationAgent(
.where(if (orcaSqlProperties.partitionName == null) {
DSL.noCondition()
} else {
DSL.field("`partition`").eq(orcaSqlProperties.partitionName)
DSL.field(name("partition")).eq(orcaSqlProperties.partitionName)
})
.and(DSL.field("application").`in`(*chunk.toTypedArray()))
.groupBy(DSL.field("application"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ class ExecutionMapper(
mapper.readValue<PipelineExecution>(rs.getString("body"))
.also {
execution -> results.add(execution)
execution.partition = rs.getString("`partition`")
execution.partition = rs.getString("partition")

if (rs.getString("id") != execution.id) {
// Map legacyId executions to their current ULID
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ import org.jooq.DSLContext
import org.jooq.DatePart
import org.jooq.Field
import org.jooq.Record
import org.jooq.SQLDialect
import org.jooq.SelectConditionStep
import org.jooq.SelectConnectByStep
import org.jooq.SelectForUpdateStep
Expand Down Expand Up @@ -601,7 +602,7 @@ class SqlExecutionRepository(

override fun countActiveExecutions(): ActiveExecutionsReport {
withPool(poolName) {
val partitionPredicate = if (partitionName != null) field("`partition`").eq(partitionName) else value(1).eq(value(1))
val partitionPredicate = if (partitionName != null) field(name("partition")).eq(partitionName) else value(1).eq(value(1))

val orchestrationsQuery = jooq.selectCount()
.from(ORCHESTRATION.tableName)
Expand Down Expand Up @@ -748,29 +749,35 @@ class SqlExecutionRepository(

val (executionId, legacyId) = mapLegacyId(ctx, tableName, execution.id, execution.startTime)

val insertPairs = mapOf(
val insertPairs = mutableMapOf(
field("id") to executionId,
field("legacy_id") to legacyId,
field(name("partition")) to partitionName,
field("status") to status,
field("application") to execution.application,
field("build_time") to (execution.buildTime ?: currentTimeMillis()),
field("start_time") to execution.startTime,
field("canceled") to execution.isCanceled,
field("updated_at") to currentTimeMillis(),
field("body") to body
)

val updatePairs = mapOf(
val updatePairs = mutableMapOf(
field("status") to status,
field("body") to body,
field(name("partition")) to partitionName,
// won't have started on insert
field("start_time") to execution.startTime,
field("canceled") to execution.isCanceled,
field("updated_at") to currentTimeMillis()
)

// Set startTime only if it is not null
// jooq has some issues casting nulls when updating in the Postgres dialect
val startTime = execution.startTime
if (startTime != null) {
insertPairs[field("start_time")] = startTime
updatePairs[field("start_time")] = startTime
}

when (execution.type) {
PIPELINE -> upsert(
ctx,
Expand Down Expand Up @@ -881,9 +888,21 @@ class SqlExecutionRepository(
try {
ctx.insertInto(table, *insertPairs.keys.toTypedArray())
.values(insertPairs.values)
.onDuplicateKeyUpdate()
.set(updatePairs)
.execute()
.run {
when (jooq.dialect()) {
SQLDialect.POSTGRES -> {
onConflict(DSL.field("id"))
.doUpdate()
.set(updatePairs)
.execute()
}
else -> {
onDuplicateKeyUpdate()
.set(updatePairs)
.execute()
}
}
}
} catch (e: SQLDialectNotSupportedException) {
log.debug("Falling back to primitive upsert logic: ${e.message}")
val exists = ctx.fetchExists(ctx.select().from(table).where(field("id").eq(updateId)).forUpdate())
Expand Down Expand Up @@ -990,7 +1009,7 @@ class SqlExecutionRepository(
seek: (SelectConnectByStep<out Record>) -> SelectForUpdateStep<out Record>
) =
select(fields)
.from(type.tableName.forceIndex(usingIndex))
.from(if (jooq.dialect() == SQLDialect.MYSQL) type.tableName.forceIndex(usingIndex) else type.tableName)
.let { conditions(it) }
.let { seek(it) }

Expand All @@ -999,7 +1018,7 @@ class SqlExecutionRepository(
.from(type.tableName)

private fun selectFields() =
listOf(field("id"), field("body"), field("`partition`"))
listOf(field("id"), field("body"), field(name("partition")))

private fun SelectForUpdateStep<out Record>.fetchExecutions() =
ExecutionMapper(mapper, stageReadSize).map(fetch().intoResultSet(), jooq)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@ databaseChangeLog:
- changeSet:
id: create-notification-lock-table
author: afeldman
validCheckSum:
# Original changeset checksum
- 8:4f24acc265a458e399dc36f607a74f90
changes:
- createTable:
tableName: notification_lock
Expand All @@ -14,6 +17,6 @@ databaseChangeLog:
nullable: false
- column:
name: expiry
type: bigint(13)
type: bigint
constraints:
nullable: false
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,38 @@ databaseChangeLog:
- changeSet:
id: create-deleted-executions-table
author: mvulfson
validCheckSum:
# Original changeset checksum
- 8:847dd443c55dcb0a47a9576b891ad5ef
changes:
- createTable:
tableName: deleted_executions
columns:
- column:
name: id
type: int
autoIncrement: true
constraints:
primaryKey: true
nullable: false
- column:
name: execution_id
type: varchar(26)
constraints:
nullable: false
- column:
name: deleted_at
type: datetime
constraints:
nullable: false
- column:
name: execution_type
type: varchar(26)
constraints:
nullable: false
- sql:
dbms: mysql
sql: CREATE TABLE `deleted_executions` (id INT AUTO_INCREMENT, execution_id char(26) NOT NULL, execution_type ENUM("pipeline", "orchestration") NOT NULL, deleted_at DATETIME NOT NULL, CONSTRAINT deleted_executions_pk PRIMARY KEY (id))
sql: ALTER TABLE `deleted_executions` MODIFY COLUMN `execution_type` ENUM("pipeline", "orchestration") NOT NULL
rollback:
- dropTable:
tableName: deleted_executions
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import spock.lang.Shared
import spock.lang.Specification

import static com.netflix.spinnaker.kork.sql.test.SqlTestUtil.cleanupDb
import static com.netflix.spinnaker.kork.sql.test.SqlTestUtil.initTcPostgresDatabase
import static com.netflix.spinnaker.orca.api.pipeline.models.ExecutionType.PIPELINE
import static com.netflix.spinnaker.kork.sql.test.SqlTestUtil.initTcMysqlDatabase
import static java.time.temporal.ChronoUnit.DAYS
Expand Down Expand Up @@ -144,3 +145,10 @@ class OldPipelineCleanupPollingNotificationAgentSpec extends Specification {
return e
}
}

class PgOldPipelineCleanupPollingNotificationAgentSpec extends OldPipelineCleanupPollingNotificationAgentSpec {
def setupSpec() {
currentDatabase = initTcPostgresDatabase()
executionRepository = new SqlExecutionRepository("test", currentDatabase.context, mapper, new RetryProperties(), 10, 100, "poolName", null)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,15 +40,15 @@ import spock.lang.AutoCleanup
import spock.lang.Shared
import spock.lang.Unroll

import static com.netflix.spinnaker.kork.sql.test.SqlTestUtil.initPreviousTcMysqlDatabase
import static com.netflix.spinnaker.kork.sql.test.SqlTestUtil.cleanupDb
import static com.netflix.spinnaker.kork.sql.test.SqlTestUtil.initDualTcMysqlDatabases
import static com.netflix.spinnaker.kork.sql.test.SqlTestUtil.initDualTcPostgresDatabases
import static com.netflix.spinnaker.orca.api.pipeline.models.ExecutionType.PIPELINE
import static com.netflix.spinnaker.orca.pipeline.persistence.ExecutionRepository.ExecutionComparator.BUILD_TIME_ASC
import static com.netflix.spinnaker.orca.pipeline.persistence.ExecutionRepository.ExecutionComparator.BUILD_TIME_DESC
import static com.netflix.spinnaker.orca.pipeline.persistence.ExecutionRepository.ExecutionCriteria
import static com.netflix.spinnaker.orca.test.model.ExecutionBuilder.orchestration
import static com.netflix.spinnaker.orca.test.model.ExecutionBuilder.pipeline
import static com.netflix.spinnaker.kork.sql.test.SqlTestUtil.cleanupDb
import static com.netflix.spinnaker.kork.sql.test.SqlTestUtil.initTcMysqlDatabase

class SqlPipelineExecutionRepositorySpec extends PipelineExecutionRepositoryTck<ExecutionRepository> {

Expand All @@ -69,13 +69,12 @@ class SqlPipelineExecutionRepositorySpec extends PipelineExecutionRepositoryTck<
TestDatabase previousDatabase

def setupSpec() {
currentDatabase = initTcMysqlDatabase()
previousDatabase = initPreviousTcMysqlDatabase()
currentDatabase = initDualTcMysqlDatabases()
}

def cleanup() {
cleanupDb(currentDatabase.context)
cleanupDb(previousDatabase.context)
cleanupDb(currentDatabase.previousContext)
}

@Override
Expand All @@ -85,7 +84,7 @@ class SqlPipelineExecutionRepositorySpec extends PipelineExecutionRepositoryTck<

@Override
ExecutionRepository createExecutionRepositoryPrevious() {
new SqlExecutionRepository("test", previousDatabase.context, mapper, new RetryProperties(), 10, 100, "poolName", null)
new SqlExecutionRepository("test", currentDatabase.previousContext, mapper, new RetryProperties(), 10, 100, "poolName", null)
}

ExecutionRepository createExecutionRepository(String partition, Interlink interlink = null) {
Expand Down Expand Up @@ -154,7 +153,7 @@ class SqlPipelineExecutionRepositorySpec extends PipelineExecutionRepositoryTck<

currentDatabase.context
.update(DSL.table("pipelines"))
.set(DSL.field("`partition`"), DSL.value("foreign"))
.set(DSL.field(DSL.name("partition")), DSL.value("foreign"))
.execute()

when:
Expand Down Expand Up @@ -200,7 +199,7 @@ class SqlPipelineExecutionRepositorySpec extends PipelineExecutionRepositoryTck<

currentDatabase.context
.update(DSL.table("pipelines"))
.set(DSL.field("`partition`"), DSL.value("foreign"))
.set(DSL.field(DSL.name("partition")), DSL.value("foreign"))
.execute()

when:
Expand Down Expand Up @@ -667,3 +666,9 @@ class SqlPipelineExecutionRepositorySpec extends PipelineExecutionRepositoryTck<
).size() == 0
}
}

class PgSqlPipelineExecutionRepositorySpec extends SqlPipelineExecutionRepositorySpec {
def setupSpec() {
currentDatabase = initDualTcPostgresDatabases()
}
}
1 change: 1 addition & 0 deletions orca-web/orca-web.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ dependencies {

if (!rootProject.hasProperty("excludeSqlDrivers")) {
runtimeOnly(project(":orca-sql-mysql"))
runtimeOnly(project(":orca-sql-postgres"))
}

compileOnly("org.projectlombok:lombok")
Expand Down
1 change: 1 addition & 0 deletions settings.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ include "orca-api",
"orca-redis",
"orca-retrofit",
"orca-sql-mysql",
"orca-sql-postgres",
"orca-sql",
"orca-test-groovy",
"orca-test-kotlin",
Expand Down

0 comments on commit 0bb15f3

Please sign in to comment.