-
Notifications
You must be signed in to change notification settings - Fork 34
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add PPL Between functionality (#758)
* Implement between Signed-off-by: Hendrik Saly <hendrik.saly@eliatra.com> * add integration test for between command to the straight and NOT usage Signed-off-by: Jens Schmidt <jens.schmidt@eliatra.com> * Add docs Signed-off-by: Hendrik Saly <hendrik.saly@eliatra.com> * Add proposed syntax to ppl planning Signed-off-by: Hendrik Saly <hendrik.saly@eliatra.com> * adjust gitignore Signed-off-by: Jens Schmidt <jens.schmidt@eliatra.com> * adjust gitignore: add spark-bin Signed-off-by: Jens Schmidt <jens.schmidt@eliatra.com> * clean .gitignore: remove local adjustments Signed-off-by: Jens Schmidt <jens.schmidt@eliatra.com> * update integration test to use between keyword Signed-off-by: Jens Schmidt <jens.schmidt@eliatra.com> * Move to comparisonExpression Signed-off-by: Hendrik Saly <hendrik.saly@eliatra.com> * Added to keywordsCanBeId Signed-off-by: Hendrik Saly <hendrik.saly@eliatra.com> * Update docs Signed-off-by: Hendrik Saly <hendrik.saly@eliatra.com> * Add additional tests Signed-off-by: Hendrik Saly <hendrik.saly@eliatra.com> * Move to comparisonExpression -2- Signed-off-by: Hendrik Saly <hendrik.saly@eliatra.com> * Fix docs Signed-off-by: Hendrik Saly <hendrik.saly@eliatra.com> * Added IT tests Signed-off-by: Hendrik Saly <hendrik.saly@eliatra.com> --------- Signed-off-by: Hendrik Saly <hendrik.saly@eliatra.com> Signed-off-by: Jens Schmidt <jens.schmidt@eliatra.com> Co-authored-by: Hendrik Saly <hendrik.saly@eliatra.com>
- Loading branch information
1 parent
9d2f94d
commit 82770ec
Showing
9 changed files
with
234 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,17 @@ | ||
## between syntax proposal | ||
|
||
1. **Proposed syntax** | ||
- `... | where expr1 [NOT] BETWEEN expr2 AND expr3` | ||
- evaluate if expr1 is [not] in between expr2 and expr3 | ||
- `... | where a between 1 and 4` - Note: This returns a >= 1 and a <= 4, i.e. [1, 4] | ||
- `... | where b not between '2024-09-10' and '2025-09-10'` - Note: This returns b >= '2024-09-10' and b <= '2025-09-10' | ||
|
||
### New syntax definition in ANTLR | ||
|
||
```ANTLR | ||
logicalExpression | ||
... | ||
| expr1 = functionArg NOT? BETWEEN expr2 = functionArg AND expr3 = functionArg # between | ||
``` |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
136 changes: 136 additions & 0 deletions
136
...st/src/integration/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLBetweenITSuite.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,136 @@ | ||
/* | ||
* Copyright OpenSearch Contributors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
||
package org.opensearch.flint.spark.ppl | ||
|
||
import java.sql.Timestamp | ||
|
||
import org.apache.spark.sql.QueryTest | ||
import org.apache.spark.sql.streaming.StreamTest | ||
|
||
class FlintSparkPPLBetweenITSuite | ||
extends QueryTest | ||
with LogicalPlanTestUtils | ||
with FlintPPLSuite | ||
with StreamTest { | ||
|
||
/** Test table and index name */ | ||
private val testTable = "spark_catalog.default.flint_ppl_test" | ||
private val timeSeriesTestTable = "spark_catalog.default.flint_ppl_timeseries_test" | ||
|
||
override def beforeAll(): Unit = { | ||
super.beforeAll() | ||
|
||
// Create test tables | ||
createPartitionedStateCountryTable(testTable) | ||
createTimeSeriesTable(timeSeriesTestTable) | ||
} | ||
|
||
protected override def afterEach(): Unit = { | ||
super.afterEach() | ||
// Stop all streaming jobs if any | ||
spark.streams.active.foreach { job => | ||
job.stop() | ||
job.awaitTermination() | ||
} | ||
} | ||
|
||
test("test between should return records between two integer values") { | ||
val frame = sql(s""" | ||
| source = $testTable | where age between 20 and 30 | ||
| """.stripMargin) | ||
|
||
val results = frame.collect() | ||
assert(results.length == 3) | ||
assert(frame.columns.length == 6) | ||
|
||
results.foreach(row => { | ||
val age = row.getAs[Int]("age") | ||
assert(age >= 20 && age <= 30, s"Age $age is not between 20 and 30") | ||
}) | ||
} | ||
|
||
test("test between should return records between two integer computed values") { | ||
val frame = sql(s""" | ||
| source = $testTable | where age between 20 + 1 and 30 - 1 | ||
| """.stripMargin) | ||
|
||
val results = frame.collect() | ||
assert(results.length == 1) | ||
assert(frame.columns.length == 6) | ||
|
||
results.foreach(row => { | ||
val age = row.getAs[Int]("age") | ||
assert(age >= 21 && age <= 29, s"Age $age is not between 21 and 29") | ||
}) | ||
} | ||
|
||
test("test between should return records NOT between two integer values") { | ||
val frame = sql(s""" | ||
| source = $testTable | where age NOT between 20 and 30 | ||
| """.stripMargin) | ||
|
||
val results = frame.collect() | ||
assert(results.length == 1) | ||
assert(frame.columns.length == 6) | ||
|
||
results.foreach(row => { | ||
val age = row.getAs[Int]("age") | ||
assert(age < 20 || age > 30, s"Age $age is not between 20 and 30") | ||
}) | ||
} | ||
|
||
test("test between should return records where NOT between two integer values") { | ||
val frame = sql(s""" | ||
| source = $testTable | where NOT age between 20 and 30 | ||
| """.stripMargin) | ||
|
||
val results = frame.collect() | ||
assert(results.length == 1) | ||
assert(frame.columns.length == 6) | ||
|
||
results.foreach(row => { | ||
val age = row.getAs[Int]("age") | ||
assert(age < 20 || age > 30, s"Age $age is not between 20 and 30") | ||
}) | ||
} | ||
|
||
test("test between should return records between two date values") { | ||
val frame = sql(s""" | ||
| source = $timeSeriesTestTable | where time between '2023-10-01 00:01:00' and '2023-10-01 00:10:00' | ||
| """.stripMargin) | ||
|
||
val results = frame.collect() | ||
assert(results.length == 2) | ||
assert(frame.columns.length == 4) | ||
|
||
results.foreach(row => { | ||
val ts = row.getAs[Timestamp]("time") | ||
assert( | ||
!ts.before(Timestamp.valueOf("2023-10-01 00:01:00")) || !ts.after( | ||
Timestamp.valueOf("2023-10-01 00:01:00")), | ||
s"Timestamp $ts is not between '2023-10-01 00:01:00' and '2023-10-01 00:10:00'") | ||
}) | ||
} | ||
|
||
test("test between should return records NOT between two date values") { | ||
val frame = sql(s""" | ||
| source = $timeSeriesTestTable | where time NOT between '2023-10-01 00:01:00' and '2023-10-01 00:10:00' | ||
| """.stripMargin) | ||
|
||
val results = frame.collect() | ||
assert(results.length == 3) | ||
assert(frame.columns.length == 4) | ||
|
||
results.foreach(row => { | ||
val ts = row.getAs[Timestamp]("time") | ||
assert( | ||
ts.before(Timestamp.valueOf("2023-10-01 00:01:00")) || ts.after( | ||
Timestamp.valueOf("2023-10-01 00:01:00")), | ||
s"Timestamp $ts is not between '2023-10-01 00:01:00' and '2023-10-01 00:10:00'") | ||
}) | ||
|
||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
55 changes: 55 additions & 0 deletions
55
...a/org/opensearch/flint/spark/ppl/PPLLogicalPlanBetweenExpressionTranslatorTestSuite.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,55 @@ | ||
/* | ||
* Copyright OpenSearch Contributors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
||
package org.opensearch.flint.spark.ppl | ||
|
||
import org.opensearch.flint.spark.ppl.PlaneUtils.plan | ||
import org.opensearch.sql.ppl.{CatalystPlanContext, CatalystQueryPlanVisitor} | ||
import org.scalatest.matchers.should.Matchers | ||
|
||
import org.apache.spark.SparkFunSuite | ||
import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedRelation, UnresolvedStar} | ||
import org.apache.spark.sql.catalyst.expressions.{And, GreaterThanOrEqual, LessThanOrEqual, Literal} | ||
import org.apache.spark.sql.catalyst.plans.PlanTest | ||
import org.apache.spark.sql.catalyst.plans.logical._ | ||
|
||
class PPLLogicalPlanBetweenExpressionTranslatorTestSuite | ||
extends SparkFunSuite | ||
with PlanTest | ||
with LogicalPlanTestUtils | ||
with Matchers { | ||
|
||
private val planTransformer = new CatalystQueryPlanVisitor() | ||
private val pplParser = new PPLSyntaxParser() | ||
|
||
test("test between expression") { | ||
// if successful build ppl logical plan and translate to catalyst logical plan | ||
val context = new CatalystPlanContext | ||
val logPlan = { | ||
planTransformer.visit( | ||
plan( | ||
pplParser, | ||
"source = table | where datetime_field between '2024-09-10' and '2024-09-15'"), | ||
context) | ||
} | ||
// SQL: SELECT * FROM table WHERE datetime_field BETWEEN '2024-09-10' AND '2024-09-15' | ||
val star = Seq(UnresolvedStar(None)) | ||
|
||
val datetime_field = UnresolvedAttribute("datetime_field") | ||
val tableRelation = UnresolvedRelation(Seq("table")) | ||
|
||
val lowerBound = Literal("2024-09-10") | ||
val upperBound = Literal("2024-09-15") | ||
val betweenCondition = And( | ||
GreaterThanOrEqual(datetime_field, lowerBound), | ||
LessThanOrEqual(datetime_field, upperBound)) | ||
|
||
val filterPlan = Filter(betweenCondition, tableRelation) | ||
val expectedPlan = Project(star, filterPlan) | ||
|
||
comparePlans(expectedPlan, logPlan, false) | ||
} | ||
|
||
} |