Skip to content

Commit

Permalink
Merge pull request #1295 from partiql/expand-join
Browse files Browse the repository at this point in the history
Adds support for FULL/RIGHT OUTER JOIN and improves performance of JOINs
  • Loading branch information
johnedquinn authored Dec 14, 2023
2 parents ff60c72 + ed21f96 commit 9b73804
Show file tree
Hide file tree
Showing 11 changed files with 275 additions and 103 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import org.partiql.eval.internal.operator.Operator
import org.partiql.eval.internal.operator.rel.RelFilter
import org.partiql.eval.internal.operator.rel.RelJoinInner
import org.partiql.eval.internal.operator.rel.RelJoinLeft
import org.partiql.eval.internal.operator.rel.RelJoinOuterFull
import org.partiql.eval.internal.operator.rel.RelJoinRight
import org.partiql.eval.internal.operator.rel.RelProject
import org.partiql.eval.internal.operator.rel.RelScan
import org.partiql.eval.internal.operator.rex.ExprCollection
Expand Down Expand Up @@ -108,8 +110,8 @@ internal object Compiler {
return when (node.type) {
Rel.Op.Join.Type.INNER -> RelJoinInner(lhs, rhs, condition)
Rel.Op.Join.Type.LEFT -> RelJoinLeft(lhs, rhs, condition)
Rel.Op.Join.Type.RIGHT -> TODO()
Rel.Op.Join.Type.FULL -> TODO()
Rel.Op.Join.Type.RIGHT -> RelJoinRight(lhs, rhs, condition)
Rel.Op.Join.Type.FULL -> RelJoinOuterFull(lhs, rhs, condition)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,8 @@ internal class Record(val values: Array<PartiQLValue>) {
public operator fun plus(rhs: Record): Record {
return Record(this.values + rhs.values)
}

public fun copy(): Record {
return Record(this.values.copyOf())
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,11 @@ internal class RelJoinInner(
override val lhs: Operator.Relation,
override val rhs: Operator.Relation,
override val condition: Operator.Expr
) : RelJoin() {
override fun getOutputRecord(result: Boolean, lhs: Record, rhs: Record): Record {
return lhs + rhs
) : RelJoinNestedLoop() {
override fun join(condition: Boolean, lhs: Record, rhs: Record): Record? {
return when (condition) {
true -> lhs + rhs
false -> null
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,40 +2,17 @@ package org.partiql.eval.internal.operator.rel

import org.partiql.eval.internal.Record
import org.partiql.eval.internal.operator.Operator
import org.partiql.value.PartiQLValue
import org.partiql.value.PartiQLValueExperimental
import org.partiql.value.StructValue
import org.partiql.value.nullValue
import org.partiql.value.structValue

internal class RelJoinLeft(
override val lhs: Operator.Relation,
override val rhs: Operator.Relation,
override val condition: Operator.Expr
) : RelJoin() {
) : RelJoinNestedLoop() {

override fun getOutputRecord(result: Boolean, lhs: Record, rhs: Record): Record {
if (result.not()) {
override fun join(condition: Boolean, lhs: Record, rhs: Record): Record {
if (condition.not()) {
rhs.padNull()
}
return lhs + rhs
}

@OptIn(PartiQLValueExperimental::class)
private fun Record.padNull() {
this.values.indices.forEach { index ->
this.values[index] = values[index].padNull()
}
}

@OptIn(PartiQLValueExperimental::class)
private fun PartiQLValue.padNull(): PartiQLValue {
return when (this) {
is StructValue<*> -> {
val newFields = this.fields?.map { it.first to nullValue() }
structValue(newFields)
}
else -> nullValue()
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package org.partiql.eval.internal.operator.rel

import org.partiql.eval.internal.Record
import org.partiql.eval.internal.operator.Operator
import org.partiql.value.BoolValue
import org.partiql.value.PartiQLValue
import org.partiql.value.PartiQLValueExperimental
import org.partiql.value.StructValue
import org.partiql.value.nullValue
import org.partiql.value.structValue

internal abstract class RelJoinNestedLoop : Operator.Relation {

abstract val lhs: Operator.Relation
abstract val rhs: Operator.Relation
abstract val condition: Operator.Expr

private var rhsRecord: Record? = null

override fun open() {
lhs.open()
rhs.open()
rhsRecord = rhs.next()
}

abstract fun join(condition: Boolean, lhs: Record, rhs: Record): Record?

@OptIn(PartiQLValueExperimental::class)
override fun next(): Record? {
var lhsRecord = lhs.next()
var toReturn: Record? = null
do {
// Acquire LHS and RHS Records
if (lhsRecord == null) {
lhs.close()
rhsRecord = rhs.next() ?: return null
lhs.open()
lhsRecord = lhs.next()
}
// Return Joined Record
if (lhsRecord != null && rhsRecord != null) {
val input = lhsRecord + rhsRecord!!
val result = condition.eval(input)
toReturn = join(result.isTrue(), lhsRecord, rhsRecord!!)
}
}
while (toReturn == null)
return toReturn
}

override fun close() {
lhs.close()
rhs.close()
}

@OptIn(PartiQLValueExperimental::class)
private fun PartiQLValue.isTrue(): Boolean {
return this is BoolValue && this.value == true
}

@OptIn(PartiQLValueExperimental::class)
internal fun Record.padNull() {
this.values.indices.forEach { index ->
this.values[index] = values[index].padNull()
}
}

@OptIn(PartiQLValueExperimental::class)
private fun PartiQLValue.padNull(): PartiQLValue {
return when (this) {
is StructValue<*> -> {
val newFields = this.fields?.map { it.first to nullValue() }
structValue(newFields)
}
else -> nullValue()
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package org.partiql.eval.internal.operator.rel

import org.partiql.eval.internal.Record
import org.partiql.eval.internal.operator.Operator

/**
* Here's a simple implementation of FULL OUTER JOIN. The idea is fairly straightforward:
* Iterate through LHS. For each iteration of the LHS, iterate through RHS. Now, check the condition.
* - If the condition passes, return the merged record (equivalent to result of INNER JOIN)
* - If the condition does not pass, we need a way to return two records (one where the LHS is padded with nulls, and
* one where the RHS is padded with nulls). How we do this:
* - We maintain the [previousLhs] and [previousRhs]. If they are null, we then compute the next LHS and RHS. We
* store their values in-memory. Then we return a merged Record where the LHS is padded and the RHS is not (equivalent
* to result of RIGHT OUTER JOIN).
* - If they aren't null, then we pad the RHS with NULLS (we assume we've already padded the LHS) and return (equivalent
* to result of LEFT OUTER JOIN). We also make sure [previousLhs] and [previousRhs] are now null.
*
* Performance Analysis: Assume that [lhs] has size M and [rhs] has size N.
* - Time: O(M * N)
* - Space: O(1)
*/
internal class RelJoinOuterFull(
override val lhs: Operator.Relation,
override val rhs: Operator.Relation,
override val condition: Operator.Expr
) : RelJoinNestedLoop() {

private var previousLhs: Record? = null
private var previousRhs: Record? = null

override fun next(): Record? {
if (previousLhs != null && previousRhs != null) {
previousRhs!!.padNull()
val newRecord = previousLhs!! + previousRhs!!
previousLhs = null
previousRhs = null
return newRecord
}
return super.next()
}

/**
* Specifically, for FULL OUTER JOIN, when the JOIN Condition ([condition]) is TRUE, we need to return the
* rows merged (without modification). When the JOIN Condition ([condition]) is FALSE, we need to return
* the LHS padded (and merged with RHS not padded) and the RHS padded (merged with the LHS not padded).
*/
override fun join(condition: Boolean, lhs: Record, rhs: Record): Record {
when (condition) {
true -> {
previousLhs = null
previousRhs = null
}
false -> {
previousLhs = lhs.copy()
previousRhs = rhs.copy()
lhs.padNull()
}
}
return lhs + rhs
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package org.partiql.eval.internal.operator.rel

import org.partiql.eval.internal.Record
import org.partiql.eval.internal.operator.Operator

internal class RelJoinRight(
lhs: Operator.Relation,
rhs: Operator.Relation,
override val condition: Operator.Expr
) : RelJoinNestedLoop() {

override val lhs: Operator.Relation = rhs
override val rhs: Operator.Relation = lhs

override fun join(condition: Boolean, lhs: Record, rhs: Record): Record {
if (condition.not()) {
lhs.padNull()
}
return lhs + rhs
}
}
Loading

0 comments on commit 9b73804

Please sign in to comment.