Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(sql): PostgreSQL compatibility for executions and partitions #3812

Merged
merged 5 commits into from
Jul 16, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,8 @@ abstract class PipelineExecutionRepositoryTck<T extends ExecutionRepository> ext
buildTime = 0
trigger = new DefaultTrigger("manual")
}
// our ULID implementation isn't monotonic
sleep(5)
def succeededExecution = orchestration {
status = SUCCEEDED
buildTime = 0
Expand All @@ -157,8 +159,6 @@ abstract class PipelineExecutionRepositoryTck<T extends ExecutionRepository> ext

when:
repository.store(runningExecution)
// our ULID implementation isn't monotonic
sleep(5)
repository.store(succeededExecution)
def orchestrations = repository.retrieveOrchestrationsForApplication(
runningExecution.application,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ open class ExecutionCopier(

// Step 2: Copy all executions
executionRows.forEach { r ->
r.set(DSL.field("partition"), peeredId)
r.set(DSL.field(DSL.name("partition")), peeredId)
latestUpdatedAt = max(latestUpdatedAt, r.get("updated_at", Long::class.java))
}
destDB.loadRecords(getExecutionTable(executionType).name, executionRows)
Expand Down
4 changes: 4 additions & 0 deletions orca-sql-postgres/orca-sql-postgres.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
dependencies {
implementation(project(":orca-sql"))
runtimeOnly("org.postgresql:postgresql")
}
2 changes: 2 additions & 0 deletions orca-sql/orca-sql.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,10 @@ dependencies {
testImplementation("dev.minutest:minutest")
testImplementation("com.nhaarman:mockito-kotlin")
testImplementation("org.testcontainers:mysql")
testImplementation("org.testcontainers:postgresql")

testRuntimeOnly("mysql:mysql-connector-java")
testRuntimeOnly("org.postgresql:postgresql")
testRuntimeOnly("org.junit.jupiter:junit-jupiter-engine")
testRuntimeOnly("org.junit.platform:junit-platform-launcher")
testRuntimeOnly("org.junit.vintage:junit-vintage-engine")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import io.vavr.control.Try
import java.time.Clock
import java.time.Duration
import org.jooq.DSLContext
import org.jooq.SQLDialect
import org.jooq.exception.SQLDialectNotSupportedException
import org.jooq.impl.DSL
import org.slf4j.LoggerFactory
Expand Down Expand Up @@ -52,8 +53,19 @@ class SqlNotificationClusterLock(
jooq.insertInto(lockTable)
.set(lockField, notificationType)
.set(expiryField, now.plusSeconds(lockTimeoutSeconds).toEpochMilli())
.onDuplicateKeyIgnore()
.execute()
.run {
when (jooq.dialect()) {
SQLDialect.POSTGRES -> {
onConflict(DSL.field("lock_name"))
.doNothing()
.execute()
}
else -> {
onDuplicateKeyIgnore()
.execute()
}
}
}
}

if (changed == 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import java.util.concurrent.atomic.AtomicInteger
import org.jooq.DSLContext
import org.jooq.impl.DSL.count
import org.jooq.impl.DSL.field
import org.jooq.impl.DSL.name
import org.jooq.impl.DSL.table
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression
import org.springframework.boot.context.properties.EnableConfigurationProperties
Expand Down Expand Up @@ -84,7 +85,7 @@ class OldPipelineCleanupPollingNotificationAgent(
if (orcaSqlProperties.partitionName != null) {
queryBuilder = queryBuilder
.and(
field("`partition`").eq(orcaSqlProperties.partitionName))
field(name("partition")).eq(orcaSqlProperties.partitionName))
}

val pipelineConfigsWithOldExecutions = queryBuilder
Expand Down Expand Up @@ -145,7 +146,7 @@ class OldPipelineCleanupPollingNotificationAgent(
if (orcaSqlProperties.partitionName != null) {
queryBuilder = queryBuilder
.and(
field("`partition`").eq(orcaSqlProperties.partitionName))
field(name("partition")).eq(orcaSqlProperties.partitionName))
}

val executionsToRemove = queryBuilder
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import com.netflix.spinnaker.orca.pipeline.persistence.ExecutionRepository
import java.util.concurrent.atomic.AtomicInteger
import org.jooq.DSLContext
import org.jooq.impl.DSL
import org.jooq.impl.DSL.name
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression
import org.springframework.boot.context.properties.EnableConfigurationProperties
import org.springframework.stereotype.Component
Expand Down Expand Up @@ -62,7 +63,7 @@ class TopApplicationExecutionCleanupPollingNotificationAgent(
.where(if (orcaSqlProperties.partitionName == null) {
DSL.noCondition()
} else {
DSL.field("`partition`").eq(orcaSqlProperties.partitionName)
DSL.field(name("partition")).eq(orcaSqlProperties.partitionName)
})
.and(DSL.field("application").`in`(*chunk.toTypedArray()))
.groupBy(DSL.field("application"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ class ExecutionMapper(
mapper.readValue<PipelineExecution>(rs.getString("body"))
.also {
execution -> results.add(execution)
execution.partition = rs.getString("`partition`")
execution.partition = rs.getString("partition")

if (rs.getString("id") != execution.id) {
// Map legacyId executions to their current ULID
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ import org.jooq.DSLContext
import org.jooq.DatePart
import org.jooq.Field
import org.jooq.Record
import org.jooq.SQLDialect
import org.jooq.SelectConditionStep
import org.jooq.SelectConnectByStep
import org.jooq.SelectForUpdateStep
Expand Down Expand Up @@ -601,7 +602,7 @@ class SqlExecutionRepository(

override fun countActiveExecutions(): ActiveExecutionsReport {
withPool(poolName) {
val partitionPredicate = if (partitionName != null) field("`partition`").eq(partitionName) else value(1).eq(value(1))
val partitionPredicate = if (partitionName != null) field(name("partition")).eq(partitionName) else value(1).eq(value(1))

val orchestrationsQuery = jooq.selectCount()
.from(ORCHESTRATION.tableName)
Expand Down Expand Up @@ -748,29 +749,35 @@ class SqlExecutionRepository(

val (executionId, legacyId) = mapLegacyId(ctx, tableName, execution.id, execution.startTime)

val insertPairs = mapOf(
val insertPairs = mutableMapOf(
field("id") to executionId,
field("legacy_id") to legacyId,
field(name("partition")) to partitionName,
field("status") to status,
field("application") to execution.application,
field("build_time") to (execution.buildTime ?: currentTimeMillis()),
field("start_time") to execution.startTime,
field("canceled") to execution.isCanceled,
field("updated_at") to currentTimeMillis(),
field("body") to body
)

val updatePairs = mapOf(
val updatePairs = mutableMapOf(
field("status") to status,
field("body") to body,
field(name("partition")) to partitionName,
// won't have started on insert
field("start_time") to execution.startTime,
field("canceled") to execution.isCanceled,
field("updated_at") to currentTimeMillis()
)

// Set startTime only if it is not null
// jooq has some issues casting nulls when updating in the Postgres dialect
val startTime = execution.startTime
if (startTime != null) {
insertPairs[field("start_time")] = startTime
updatePairs[field("start_time")] = startTime
}

when (execution.type) {
PIPELINE -> upsert(
ctx,
Expand Down Expand Up @@ -881,9 +888,21 @@ class SqlExecutionRepository(
try {
ctx.insertInto(table, *insertPairs.keys.toTypedArray())
.values(insertPairs.values)
.onDuplicateKeyUpdate()
.set(updatePairs)
.execute()
.run {
when (jooq.dialect()) {
SQLDialect.POSTGRES -> {
onConflict(DSL.field("id"))
.doUpdate()
.set(updatePairs)
.execute()
}
else -> {
onDuplicateKeyUpdate()
.set(updatePairs)
.execute()
}
}
}
} catch (e: SQLDialectNotSupportedException) {
log.debug("Falling back to primitive upsert logic: ${e.message}")
val exists = ctx.fetchExists(ctx.select().from(table).where(field("id").eq(updateId)).forUpdate())
Expand Down Expand Up @@ -990,7 +1009,7 @@ class SqlExecutionRepository(
seek: (SelectConnectByStep<out Record>) -> SelectForUpdateStep<out Record>
) =
select(fields)
.from(type.tableName.forceIndex(usingIndex))
.from(if (jooq.dialect() == SQLDialect.MYSQL) type.tableName.forceIndex(usingIndex) else type.tableName)
.let { conditions(it) }
.let { seek(it) }

Expand All @@ -999,7 +1018,7 @@ class SqlExecutionRepository(
.from(type.tableName)

private fun selectFields() =
listOf(field("id"), field("body"), field("`partition`"))
listOf(field("id"), field("body"), field(name("partition")))

private fun SelectForUpdateStep<out Record>.fetchExecutions() =
ExecutionMapper(mapper, stageReadSize).map(fetch().intoResultSet(), jooq)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@ databaseChangeLog:
- changeSet:
id: create-notification-lock-table
author: afeldman
validCheckSum:
# Original changeset checksum
- 8:4f24acc265a458e399dc36f607a74f90
changes:
- createTable:
tableName: notification_lock
Expand All @@ -14,6 +17,6 @@ databaseChangeLog:
nullable: false
- column:
name: expiry
type: bigint(13)
type: bigint
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Won't validCheckSum be required here as well?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed it would be - I've added the original checksum to the migration.

constraints:
nullable: false
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,38 @@ databaseChangeLog:
- changeSet:
id: create-deleted-executions-table
author: mvulfson
validCheckSum:
# Original changeset checksum
- 8:847dd443c55dcb0a47a9576b891ad5ef
changes:
- createTable:
tableName: deleted_executions
columns:
- column:
name: id
type: int
autoIncrement: true
constraints:
primaryKey: true
nullable: false
- column:
name: execution_id
type: varchar(26)
constraints:
nullable: false
- column:
name: deleted_at
type: datetime
constraints:
nullable: false
- column:
name: execution_type
type: varchar(26)
constraints:
nullable: false
- sql:
dbms: mysql
sql: CREATE TABLE `deleted_executions` (id INT AUTO_INCREMENT, execution_id char(26) NOT NULL, execution_type ENUM("pipeline", "orchestration") NOT NULL, deleted_at DATETIME NOT NULL, CONSTRAINT deleted_executions_pk PRIMARY KEY (id))
sql: ALTER TABLE `deleted_executions` MODIFY COLUMN `execution_type` ENUM("pipeline", "orchestration") NOT NULL
Comment on lines -8 to +36
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👏

rollback:
- dropTable:
tableName: deleted_executions
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import spock.lang.Shared
import spock.lang.Specification

import static com.netflix.spinnaker.kork.sql.test.SqlTestUtil.cleanupDb
import static com.netflix.spinnaker.kork.sql.test.SqlTestUtil.initTcPostgresDatabase
import static com.netflix.spinnaker.orca.api.pipeline.models.ExecutionType.PIPELINE
import static com.netflix.spinnaker.kork.sql.test.SqlTestUtil.initTcMysqlDatabase
import static java.time.temporal.ChronoUnit.DAYS
Expand Down Expand Up @@ -144,3 +145,10 @@ class OldPipelineCleanupPollingNotificationAgentSpec extends Specification {
return e
}
}

class PgOldPipelineCleanupPollingNotificationAgentSpec extends OldPipelineCleanupPollingNotificationAgentSpec {
def setupSpec() {
currentDatabase = initTcPostgresDatabase()
executionRepository = new SqlExecutionRepository("test", currentDatabase.context, mapper, new RetryProperties(), 10, 100, "poolName", null)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,15 +40,15 @@ import spock.lang.AutoCleanup
import spock.lang.Shared
import spock.lang.Unroll

import static com.netflix.spinnaker.kork.sql.test.SqlTestUtil.initPreviousTcMysqlDatabase
import static com.netflix.spinnaker.kork.sql.test.SqlTestUtil.cleanupDb
import static com.netflix.spinnaker.kork.sql.test.SqlTestUtil.initDualTcMysqlDatabases
import static com.netflix.spinnaker.kork.sql.test.SqlTestUtil.initDualTcPostgresDatabases
import static com.netflix.spinnaker.orca.api.pipeline.models.ExecutionType.PIPELINE
import static com.netflix.spinnaker.orca.pipeline.persistence.ExecutionRepository.ExecutionComparator.BUILD_TIME_ASC
import static com.netflix.spinnaker.orca.pipeline.persistence.ExecutionRepository.ExecutionComparator.BUILD_TIME_DESC
import static com.netflix.spinnaker.orca.pipeline.persistence.ExecutionRepository.ExecutionCriteria
import static com.netflix.spinnaker.orca.test.model.ExecutionBuilder.orchestration
import static com.netflix.spinnaker.orca.test.model.ExecutionBuilder.pipeline
import static com.netflix.spinnaker.kork.sql.test.SqlTestUtil.cleanupDb
import static com.netflix.spinnaker.kork.sql.test.SqlTestUtil.initTcMysqlDatabase

class SqlPipelineExecutionRepositorySpec extends PipelineExecutionRepositoryTck<ExecutionRepository> {

Expand All @@ -69,13 +69,12 @@ class SqlPipelineExecutionRepositorySpec extends PipelineExecutionRepositoryTck<
TestDatabase previousDatabase

def setupSpec() {
currentDatabase = initTcMysqlDatabase()
previousDatabase = initPreviousTcMysqlDatabase()
currentDatabase = initDualTcMysqlDatabases()
}

def cleanup() {
cleanupDb(currentDatabase.context)
cleanupDb(previousDatabase.context)
cleanupDb(currentDatabase.previousContext)
}

@Override
Expand All @@ -85,7 +84,7 @@ class SqlPipelineExecutionRepositorySpec extends PipelineExecutionRepositoryTck<

@Override
ExecutionRepository createExecutionRepositoryPrevious() {
new SqlExecutionRepository("test", previousDatabase.context, mapper, new RetryProperties(), 10, 100, "poolName", null)
new SqlExecutionRepository("test", currentDatabase.previousContext, mapper, new RetryProperties(), 10, 100, "poolName", null)
}

ExecutionRepository createExecutionRepository(String partition, Interlink interlink = null) {
Expand Down Expand Up @@ -154,7 +153,7 @@ class SqlPipelineExecutionRepositorySpec extends PipelineExecutionRepositoryTck<

currentDatabase.context
.update(DSL.table("pipelines"))
.set(DSL.field("`partition`"), DSL.value("foreign"))
.set(DSL.field(DSL.name("partition")), DSL.value("foreign"))
.execute()

when:
Expand Down Expand Up @@ -200,7 +199,7 @@ class SqlPipelineExecutionRepositorySpec extends PipelineExecutionRepositoryTck<

currentDatabase.context
.update(DSL.table("pipelines"))
.set(DSL.field("`partition`"), DSL.value("foreign"))
.set(DSL.field(DSL.name("partition")), DSL.value("foreign"))
.execute()

when:
Expand Down Expand Up @@ -667,3 +666,9 @@ class SqlPipelineExecutionRepositorySpec extends PipelineExecutionRepositoryTck<
).size() == 0
}
}

class PgSqlPipelineExecutionRepositorySpec extends SqlPipelineExecutionRepositorySpec {
def setupSpec() {
currentDatabase = initDualTcPostgresDatabases()
}
}
1 change: 1 addition & 0 deletions orca-web/orca-web.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ dependencies {

if (!rootProject.hasProperty("excludeSqlDrivers")) {
runtimeOnly(project(":orca-sql-mysql"))
runtimeOnly(project(":orca-sql-postgres"))
}

compileOnly("org.projectlombok:lombok")
Expand Down
1 change: 1 addition & 0 deletions settings.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ include "orca-api",
"orca-redis",
"orca-retrofit",
"orca-sql-mysql",
"orca-sql-postgres",
"orca-sql",
"orca-test-groovy",
"orca-test-kotlin",
Expand Down