Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add RelSort and comparator between PartiQLValue #1343

Merged
merged 6 commits into from
Feb 6, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 45 additions & 31 deletions partiql-eval/src/main/kotlin/org/partiql/eval/internal/Compiler.kt
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import org.partiql.eval.internal.operator.rel.RelJoinRight
import org.partiql.eval.internal.operator.rel.RelProject
import org.partiql.eval.internal.operator.rel.RelScan
import org.partiql.eval.internal.operator.rel.RelScanIndexed
import org.partiql.eval.internal.operator.rel.RelSort
import org.partiql.eval.internal.operator.rex.ExprCase
import org.partiql.eval.internal.operator.rex.ExprCollection
import org.partiql.eval.internal.operator.rex.ExprGlobal
Expand All @@ -31,127 +32,130 @@ import org.partiql.plan.Statement
import org.partiql.plan.visitor.PlanBaseVisitor
import org.partiql.spi.connector.ConnectorBindings
import org.partiql.spi.connector.ConnectorObjectPath
import org.partiql.types.StaticType
import org.partiql.value.PartiQLValueExperimental
import java.lang.IllegalStateException

internal class Compiler(
private val plan: PartiQLPlan,
private val catalogs: Map<String, ConnectorBindings>,
) : PlanBaseVisitor<Operator, Unit>() {
) : PlanBaseVisitor<Operator, StaticType?>() { // Current ctx is a StaticType to differentiate between collection types for Rex.Op.Collection

fun compile(): Operator.Expr {
return visitPartiQLPlan(plan, Unit)
return visitPartiQLPlan(plan, null)
}

override fun defaultReturn(node: PlanNode, ctx: Unit): Operator {
override fun defaultReturn(node: PlanNode, ctx: StaticType?): Operator {
TODO("Not yet implemented")
}

override fun visitRexOpErr(node: Rex.Op.Err, ctx: Unit): Operator {
override fun visitRexOpErr(node: Rex.Op.Err, ctx: StaticType?): Operator {
throw IllegalStateException(node.message)
}

override fun visitRelOpErr(node: Rel.Op.Err, ctx: Unit): Operator {
override fun visitRelOpErr(node: Rel.Op.Err, ctx: StaticType?): Operator {
throw IllegalStateException(node.message)
}

// TODO: Re-look at
override fun visitPartiQLPlan(node: PartiQLPlan, ctx: Unit): Operator.Expr {
override fun visitPartiQLPlan(node: PartiQLPlan, ctx: StaticType?): Operator.Expr {
return visitStatement(node.statement, ctx) as Operator.Expr
}

// TODO: Re-look at
override fun visitStatementQuery(node: Statement.Query, ctx: Unit): Operator.Expr {
override fun visitStatementQuery(node: Statement.Query, ctx: StaticType?): Operator.Expr {
return visitRex(node.root, ctx)
}

// REX

override fun visitRex(node: Rex, ctx: Unit): Operator.Expr {
return super.visitRexOp(node.op, ctx) as Operator.Expr
override fun visitRex(node: Rex, ctx: StaticType?): Operator.Expr {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the current plan, the rex node's type is only accessible on the Rex and not the inner classes (e.g. Rex.Op.Collection). Passing it along in the context allows the visitor for the inner classes to access the type, which is needed by some nodes like Rex.Op.Collection.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would really like for our internal IRs to keep the types in the node by defining a type field as part of the PlanNode base class. This would require more codegen work which probably isn't worth the time at the moment. Please let me know if you have an additional ideas here

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm I'd need to play around with the generated code a bit to see if there's a better way. Only concern I'd have is that the type field for Rex is a different type (i.e. StaticType) than the type field for Rel (i.e. Rel.Type which has schema and props). Probably that representation in the PlanNode base class would be an enum that could be either of those type definitions?

Anyways, perhaps we should tackle this in another issue/PR.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you check out PlanTyper, you'll see we use two different visitors since the type is parameterized. There are many ways around this, but really I wish we didn't have the union types. What would be better is for the code generator to support abstract fields.

The best situation however would be handwritten nodes with annotation based generation like Lombok. This would give us the most control.

Copy link
Member Author

@alancai98 alancai98 Feb 6, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you check out PlanTyper, you'll see we use two different visitors since the type is parameterized.

Oh I see. The separate visitors for Rel and Rex doesn't seem too cumbersome.

The best situation however would be handwritten nodes with annotation based generation like Lombok. This would give us the most control.

Agree w/ a mix of code-generated nodes and handwritten nodes would give us the most flexibility when it comes to these interfaces.

return super.visitRexOp(node.op, node.type) as Operator.Expr
}

override fun visitRexOpCollection(node: Rex.Op.Collection, ctx: Unit): Operator {
override fun visitRexOpCollection(node: Rex.Op.Collection, ctx: StaticType?): Operator {
val values = node.values.map { visitRex(it, ctx) }
return ExprCollection(values)
val type = ctx ?: error("No type provided in ctx")
return ExprCollection(values, type)
}
override fun visitRexOpStruct(node: Rex.Op.Struct, ctx: Unit): Operator {
override fun visitRexOpStruct(node: Rex.Op.Struct, ctx: StaticType?): Operator {
val fields = node.fields.map {
ExprStruct.Field(visitRex(it.k, ctx), visitRex(it.v, ctx))
}
return ExprStruct(fields)
}

override fun visitRexOpSelect(node: Rex.Op.Select, ctx: Unit): Operator {
override fun visitRexOpSelect(node: Rex.Op.Select, ctx: StaticType?): Operator {
val rel = visitRel(node.rel, ctx)
val ordered = node.rel.type.props.contains(Rel.Prop.ORDERED)
val constructor = visitRex(node.constructor, ctx)
return ExprSelect(rel, constructor)
return ExprSelect(rel, constructor, ordered)
}

override fun visitRexOpPivot(node: Rex.Op.Pivot, ctx: Unit): Operator {
override fun visitRexOpPivot(node: Rex.Op.Pivot, ctx: StaticType?): Operator {
val rel = visitRel(node.rel, ctx)
val key = visitRex(node.key, ctx)
val value = visitRex(node.value, ctx)
return ExprPivot(rel, key, value)
}
override fun visitRexOpVar(node: Rex.Op.Var, ctx: Unit): Operator {
override fun visitRexOpVar(node: Rex.Op.Var, ctx: StaticType?): Operator {
return ExprVar(node.ref)
}

override fun visitRexOpGlobal(node: Rex.Op.Global, ctx: Unit): Operator {
override fun visitRexOpGlobal(node: Rex.Op.Global, ctx: StaticType?): Operator {
val catalog = plan.catalogs[node.ref.catalog]
val symbol = catalog.symbols[node.ref.symbol]
val path = ConnectorObjectPath(symbol.path)
val bindings = catalogs[catalog.name]!!
return ExprGlobal(path, bindings)
}

override fun visitRexOpPathKey(node: Rex.Op.Path.Key, ctx: Unit): Operator {
override fun visitRexOpPathKey(node: Rex.Op.Path.Key, ctx: StaticType?): Operator {
val root = visitRex(node.root, ctx)
val key = visitRex(node.key, ctx)
return ExprPathKey(root, key)
}

override fun visitRexOpPathSymbol(node: Rex.Op.Path.Symbol, ctx: Unit): Operator {
override fun visitRexOpPathSymbol(node: Rex.Op.Path.Symbol, ctx: StaticType?): Operator {
val root = visitRex(node.root, ctx)
val symbol = node.key
return ExprPathSymbol(root, symbol)
}

override fun visitRexOpPathIndex(node: Rex.Op.Path.Index, ctx: Unit): Operator {
override fun visitRexOpPathIndex(node: Rex.Op.Path.Index, ctx: StaticType?): Operator {
val root = visitRex(node.root, ctx)
val index = visitRex(node.key, ctx)
return ExprPathIndex(root, index)
}

// REL

override fun visitRel(node: Rel, ctx: Unit): Operator.Relation {
override fun visitRel(node: Rel, ctx: StaticType?): Operator.Relation {
return super.visitRelOp(node.op, ctx) as Operator.Relation
}

override fun visitRelOpScan(node: Rel.Op.Scan, ctx: Unit): Operator {
override fun visitRelOpScan(node: Rel.Op.Scan, ctx: StaticType?): Operator {
val rex = visitRex(node.rex, ctx)
return RelScan(rex)
}

override fun visitRelOpProject(node: Rel.Op.Project, ctx: Unit): Operator {
override fun visitRelOpProject(node: Rel.Op.Project, ctx: StaticType?): Operator {
val input = visitRel(node.input, ctx)
val projections = node.projections.map { visitRex(it, ctx) }
return RelProject(input, projections)
}

override fun visitRelOpScanIndexed(node: Rel.Op.ScanIndexed, ctx: Unit): Operator {
override fun visitRelOpScanIndexed(node: Rel.Op.ScanIndexed, ctx: StaticType?): Operator {
val rex = visitRex(node.rex, ctx)
return RelScanIndexed(rex)
}

override fun visitRexOpTupleUnion(node: Rex.Op.TupleUnion, ctx: Unit): Operator {
override fun visitRexOpTupleUnion(node: Rex.Op.TupleUnion, ctx: StaticType?): Operator {
val args = node.args.map { visitRex(it, ctx) }.toTypedArray()
return ExprTupleUnion(args)
}

override fun visitRelOpJoin(node: Rel.Op.Join, ctx: Unit): Operator {
override fun visitRelOpJoin(node: Rel.Op.Join, ctx: StaticType?): Operator {
val lhs = visitRel(node.lhs, ctx)
val rhs = visitRel(node.rhs, ctx)
val condition = visitRex(node.rex, ctx)
Expand All @@ -163,7 +167,7 @@ internal class Compiler(
}
}

override fun visitRexOpCase(node: Rex.Op.Case, ctx: Unit): Operator {
override fun visitRexOpCase(node: Rex.Op.Case, ctx: StaticType?): Operator {
val branches = node.branches.map { branch ->
visitRex(branch.condition, ctx) to visitRex(branch.rex, ctx)
}
Expand All @@ -172,23 +176,33 @@ internal class Compiler(
}

@OptIn(PartiQLValueExperimental::class)
override fun visitRexOpLit(node: Rex.Op.Lit, ctx: Unit): Operator {
override fun visitRexOpLit(node: Rex.Op.Lit, ctx: StaticType?): Operator {
return ExprLiteral(node.value)
}

override fun visitRelOpDistinct(node: Rel.Op.Distinct, ctx: Unit): Operator {
override fun visitRelOpDistinct(node: Rel.Op.Distinct, ctx: StaticType?): Operator {
val input = visitRel(node.input, ctx)
return RelDistinct(input)
}

override fun visitRelOpFilter(node: Rel.Op.Filter, ctx: Unit): Operator {
override fun visitRelOpFilter(node: Rel.Op.Filter, ctx: StaticType?): Operator {
val input = visitRel(node.input, ctx)
val condition = visitRex(node.predicate, ctx)
return RelFilter(input, condition)
}

override fun visitRelOpExclude(node: Rel.Op.Exclude, ctx: Unit): Operator {
override fun visitRelOpExclude(node: Rel.Op.Exclude, ctx: StaticType?): Operator {
val input = visitRel(node.input, ctx)
return RelExclude(input, node.paths)
}

override fun visitRelOpSort(node: Rel.Op.Sort, ctx: StaticType?): Operator {
val input = visitRel(node.input, ctx)
val compiledSpecs = node.specs.map { spec ->
val expr = visitRex(spec.rex, ctx)
val order = spec.order
expr to order
}
return RelSort(input, compiledSpecs)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package org.partiql.eval.internal.operator.rel

import org.partiql.eval.internal.Record
import org.partiql.eval.internal.operator.Operator
import org.partiql.eval.internal.util.PartiQLValueComparator
import org.partiql.plan.Rel
import org.partiql.value.PartiQLValueExperimental

internal class RelSort(
val input: Operator.Relation,
val specs: List<Pair<Operator.Expr, Rel.Op.Sort.Order>>

) : Operator.Relation {
private var records: MutableList<Record> = mutableListOf()
private var init: Boolean = false

private val nullsFirstComparator = PartiQLValueComparator(nullOrder = PartiQLValueComparator.NullOrder.FIRST)
private val nullsLastComparator = PartiQLValueComparator(nullOrder = PartiQLValueComparator.NullOrder.LAST)

override fun open() {
input.open()
init = false
records = mutableListOf()
}

@OptIn(PartiQLValueExperimental::class)
val comparator = object : Comparator<Record> {
override fun compare(l: Record, r: Record): Int {
specs.forEach { spec ->
val lVal = spec.first.eval(l)
val rVal = spec.first.eval(r)

// DESC_NULLS_FIRST(l, r) == ASC_NULLS_LAST(r, l)
// DESC_NULLS_LAST(l, r) == ASC_NULLS_FIRST(r, l)
val cmpResult = when (spec.second) {
Rel.Op.Sort.Order.ASC_NULLS_FIRST -> nullsFirstComparator.compare(lVal, rVal)
Rel.Op.Sort.Order.ASC_NULLS_LAST -> nullsLastComparator.compare(lVal, rVal)
Rel.Op.Sort.Order.DESC_NULLS_FIRST -> nullsLastComparator.compare(rVal, lVal)
Rel.Op.Sort.Order.DESC_NULLS_LAST -> nullsFirstComparator.compare(rVal, lVal)
}
if (cmpResult != 0) {
return cmpResult
}
}
return 0 // Equal
}
}

override fun next(): Record? {
if (!init) {
while (true) {
val row = input.next() ?: break
records.add(row)
}
records.sortWith(comparator)
}
return when (records.isEmpty()) {
true -> null
else -> records.removeAt(0)
}
}

override fun close() {
input.close()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,28 @@ package org.partiql.eval.internal.operator.rex

import org.partiql.eval.internal.Record
import org.partiql.eval.internal.operator.Operator
import org.partiql.types.BagType
import org.partiql.types.ListType
import org.partiql.types.SexpType
import org.partiql.types.StaticType
import org.partiql.value.PartiQLValue
import org.partiql.value.PartiQLValueExperimental
import org.partiql.value.bagValue
import org.partiql.value.listValue
import org.partiql.value.sexpValue

internal class ExprCollection(
private val values: List<Operator.Expr>
private val values: List<Operator.Expr>,
private val type: StaticType
) : Operator.Expr {

@PartiQLValueExperimental
override fun eval(record: Record): PartiQLValue {
return bagValue(
values.map { it.eval(record) }
)
return when (type) {
is BagType -> bagValue(values.map { it.eval(record) })
is SexpType -> sexpValue(values.map { it.eval(record) })
is ListType -> listValue(values.map { it.eval(record) })
else -> error("Unsupported type for collection $type")
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import org.partiql.eval.internal.operator.Operator
import org.partiql.value.PartiQLValue
import org.partiql.value.PartiQLValueExperimental
import org.partiql.value.bagValue
import org.partiql.value.listValue

/**
* Invoke the constructor over all inputs.
Expand All @@ -15,6 +16,7 @@ import org.partiql.value.bagValue
internal class ExprSelect(
val input: Operator.Relation,
val constructor: Operator.Expr,
val ordered: Boolean
) : Operator.Expr {

/**
Expand All @@ -31,6 +33,9 @@ internal class ExprSelect(
elements.add(e)
}
input.close()
return bagValue(elements)
return when (ordered) {
true -> listValue(elements)
false -> bagValue(elements)
}
}
}
Loading
Loading