Skip to content
This repository has been archived by the owner on Oct 8, 2020. It is now read-only.

fix(sql): PostgreSQL compatibility for keiko-sql #176

Merged
merged 2 commits into from
Jul 14, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import com.netflix.spinnaker.kork.sql.config.SqlRetryProperties
import com.netflix.spinnaker.q.DeadMessageCallback
import com.netflix.spinnaker.q.Message
import com.netflix.spinnaker.q.Queue
import com.netflix.spinnaker.q.sql.util.createTableLike
import com.netflix.spinnaker.q.sql.util.excluded
import de.huxhorn.sulky.ulid.ULID
import io.github.resilience4j.retry.Retry
import io.github.resilience4j.retry.RetryConfig
Expand All @@ -16,6 +18,7 @@ import java.nio.charset.StandardCharsets
import java.time.Clock
import java.time.Duration
import org.jooq.DSLContext
import org.jooq.SQLDialect
import org.jooq.exception.SQLDialectNotSupportedException
import org.jooq.impl.DSL
import org.jooq.util.mysql.MySQLDSL
Expand Down Expand Up @@ -70,17 +73,29 @@ class SqlDeadMessageHandler(
.set(fingerprintField, fingerprint)
.set(updatedAtField, clock.millis())
.set(bodyField, json)
.onDuplicateKeyUpdate()
.set(updatedAtField, MySQLDSL.values(updatedAtField) as Any)
.set(bodyField, MySQLDSL.values(bodyField) as Any)
.run {
when (jooq.dialect()) {
SQLDialect.POSTGRES ->
onConflict(fingerprintField)
.doUpdate()
.set(updatedAtField, clock.millis())
.set(bodyField, excluded(bodyField) as Any)
.execute()
else ->
onDuplicateKeyUpdate()
.set(updatedAtField, MySQLDSL.values(updatedAtField) as Any)
.set(bodyField, MySQLDSL.values(bodyField) as Any)
.execute()
}
}
}
} catch (e: Exception) {
log.error("Failed to deadLetter message, fingerprint: $fingerprint, message: $json", e)
}
}

private fun initTables() {
jooq.execute("CREATE TABLE IF NOT EXISTS $dlqTableName LIKE ${dlqBase}_template")
createTableLike(dlqTableName, "${dlqBase}_template", jooq)
}

@Suppress("UnstableApiUsage")
Expand Down
81 changes: 65 additions & 16 deletions keiko-sql/src/main/kotlin/com/netflix/spinnaker/q/sql/SqlQueue.kt
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ import com.netflix.spinnaker.q.metrics.RetryPolled
import com.netflix.spinnaker.q.migration.SerializationMigrator
import com.netflix.spinnaker.q.sql.SqlQueue.RetryCategory.READ
import com.netflix.spinnaker.q.sql.SqlQueue.RetryCategory.WRITE
import com.netflix.spinnaker.q.sql.util.createTableLike
import com.netflix.spinnaker.q.sql.util.excluded
import de.huxhorn.sulky.ulid.ULID
import io.github.resilience4j.retry.Retry
import io.github.resilience4j.retry.RetryConfig
Expand All @@ -47,6 +49,7 @@ import kotlin.math.min
import kotlin.random.Random.Default.nextLong
import org.funktionale.partials.partially1
import org.jooq.DSLContext
import org.jooq.SQLDialect
import org.jooq.SortOrder
import org.jooq.exception.SQLDialectNotSupportedException
import org.jooq.impl.DSL
Expand Down Expand Up @@ -371,8 +374,17 @@ class SqlQueue(
.set(idField, ulid.toString())
.set(fingerprintField, m.fingerprint)
.set(expiryField, m.expiry)
.onDuplicateKeyIgnore()
.execute()
.run {
when (jooq.dialect()) {
SQLDialect.POSTGRES ->
onConflict(fingerprintField)
.doNothing()
.execute()
else ->
onDuplicateKeyIgnore()
.execute()
}
}

when (changed) {
0 -> toRelease.add(m.queueId)
Expand Down Expand Up @@ -444,25 +456,46 @@ class SqlQueue(
withRetry(WRITE) {
jooq.transaction { config ->
val txn = DSL.using(config)
val bodyVal = mapper.writeValueAsString(message)

txn.insertInto(messagesTable)
.set(idField, ulid.toString())
.set(fingerprintField, fingerprint)
.set(bodyField, mapper.writeValueAsString(message))
.set(bodyField, bodyVal)
.set(updatedAtField, clock.millis())
.onDuplicateKeyUpdate()
.set(idField, MySQLDSL.values(idField) as Any)
.set(bodyField, MySQLDSL.values(bodyField) as Any)
.execute()
.run {
when (jooq.dialect()) {
SQLDialect.POSTGRES ->
onConflict(fingerprintField)
.doUpdate()
.set(bodyField, excluded(bodyField) as Any)
.execute()
else ->
onDuplicateKeyUpdate()
.set(idField, MySQLDSL.values(idField) as Any)
.set(bodyField, MySQLDSL.values(bodyField) as Any)
.execute()
}
}

txn.insertInto(queueTable)
.set(idField, ULID.nextMonotonicValue(ulid).toString())
.set(fingerprintField, fingerprint)
.set(deliveryField, deliveryTime)
.set(lockedField, "0")
.onDuplicateKeyUpdate()
.set(deliveryField, MySQLDSL.values(deliveryField) as Any)
.execute()
.run {
when (jooq.dialect()) {
SQLDialect.POSTGRES ->
onConflict(fingerprintField)
.doUpdate()
.set(deliveryField, deliveryTime)
.execute()
else ->
onDuplicateKeyUpdate()
.set(deliveryField, MySQLDSL.values(deliveryField) as Any)
.execute()
}
}
}
}

Expand Down Expand Up @@ -680,9 +713,19 @@ class SqlQueue(
.set(fingerprintField, fingerprint)
.set(deliveryField, atTime(lockTtlDuration))
.set(lockedField, "0")
.onDuplicateKeyUpdate()
.set(deliveryField, MySQLDSL.values(deliveryField) as Any)
.execute()
.run {
when (jooq.dialect()) {
SQLDialect.POSTGRES ->
onConflict(fingerprintField)
.doUpdate()
.set(deliveryField, atTime(lockTtlDuration))
.execute()
else ->
onDuplicateKeyUpdate()
.set(deliveryField, MySQLDSL.values(deliveryField) as Any)
.execute()
}
}
}
}

Expand Down Expand Up @@ -808,11 +851,17 @@ class SqlQueue(
}

private fun initTables() {
val tables = listOf(
Pair(queueTableName, queueBase),
Pair(unackedTableName, unackedBase),
Pair(messagesTableName, messagesBase)
)

withPool(poolName) {
withRetry(WRITE) {
jooq.execute("CREATE TABLE IF NOT EXISTS $queueTableName LIKE ${queueBase}_template")
jooq.execute("CREATE TABLE IF NOT EXISTS $unackedTableName LIKE ${unackedBase}_template")
jooq.execute("CREATE TABLE IF NOT EXISTS $messagesTableName LIKE ${messagesBase}_template")
for (tablePair in tables) {
createTableLike(tablePair.first, "${tablePair.second}_template", jooq)
}
}
}
}
Expand Down
44 changes: 44 additions & 0 deletions keiko-sql/src/main/kotlin/com/netflix/spinnaker/q/sql/util/Util.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Copyright 2020 Apple, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.netflix.spinnaker.q.sql.util

import org.jooq.DSLContext
import org.jooq.Field
import org.jooq.SQLDialect
import org.jooq.impl.DSL

fun currentSchema(context: DSLContext): String {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: these two could be made into kotlin extension method on DSLContext

return context.fetch("select current_schema()")
.getValue(0, DSL.field("current_schema")).toString()
}

fun createTableLike(newTable: String, templateTable: String, context: DSLContext) {
var sql = "CREATE TABLE IF NOT EXISTS "
sql += when (context.dialect()) {
SQLDialect.POSTGRES -> {
val cs = currentSchema(context)
"$cs.$newTable ( LIKE $cs.$templateTable INCLUDING ALL )"
}
else -> "$newTable LIKE $templateTable"
}
context.execute(sql)
}

// Allows insertion of virtual Postgres values on conflict, similar to MySQLDSL.values
fun <T> excluded(values: Field<T>): Field<T> {
return DSL.field("excluded.{0}", values.dataType, values)
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@ databaseChangeLog:
- changeSet:
id: create-keiko-queue-table-v1
author: afeldman
validCheckSum:
# Original changeset checksum
- 8:551ab623136d3c624d7fd4928d0f4c51
changes:
- createTable:
tableName: keiko_v1_queue_template
Expand All @@ -19,7 +22,7 @@ databaseChangeLog:
nullable: false
- column:
name: delivery
type: bigint(13)
type: bigint
constraints:
nullable: false
- column:
Expand Down Expand Up @@ -175,6 +178,9 @@ databaseChangeLog:
- changeSet:
id: create-keiko-dlq-table-v1
author: afeldman
validCheckSum:
# Original changeset checksum
- 8:7b2c146b7f0ff7de56fdb729c8e87277
changes:
- createTable:
tableName: keiko_v1_dlq_template
Expand All @@ -192,7 +198,7 @@ databaseChangeLog:
nullable: false
- column:
name: updated_at
type: bigint(13)
type: bigint
constraints:
nullable: false
- column:
Expand Down Expand Up @@ -233,13 +239,16 @@ databaseChangeLog:
- changeSet:
id: add-keiko-queue-messages-updated-col
author: afeldman
validCheckSum:
# Original changeset checksum
- 8:53370973bc79095573dfe46d41cffae7
changes:
- addColumn:
tableName: keiko_v1_messages_template
columns:
- column:
name: updated_at
type: bigint(13)
type: bigint
defaultValue: 0
constraints:
nullable: false
Expand Down