Skip to content

Commit

Permalink
PPl flatten command (#784)
Browse files Browse the repository at this point in the history
* The flatten command implemented

Signed-off-by: Lukasz Soszynski <lukasz.soszynski@eliatra.com>

* The flatten command integration tests were extended with additional checks for logical plans.

Signed-off-by: Lukasz Soszynski <lukasz.soszynski@eliatra.com>

* flatten, added more tests related to plan translation and integration tests

Signed-off-by: Lukasz Soszynski <lukasz.soszynski@eliatra.com>

* Flatten command added to command names list.

Signed-off-by: Lukasz Soszynski <lukasz.soszynski@eliatra.com>

---------

Signed-off-by: Lukasz Soszynski <lukasz.soszynski@eliatra.com>
  • Loading branch information
lukasz-soszynski-eliatra authored Oct 31, 2024
1 parent 98962c3 commit b71a9ed
Show file tree
Hide file tree
Showing 13 changed files with 829 additions and 2 deletions.
9 changes: 9 additions & 0 deletions docs/ppl-lang/PPL-Example-Commands.md
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,15 @@ Assumptions: `a`, `b`, `c`, `d`, `e` are existing fields in `table`
- `source = table | fillnull using a = 101, b = 102`
- `source = table | fillnull using a = concat(b, c), d = 2 * pi() * e`

### Flatten
[See additional command details](ppl-flatten-command.md)
Assumptions: `bridges`, `coor` are existing fields in `table`, and the field's types are `struct<?,?>` or `array<struct<?,?>>`
- `source = table | flatten bridges`
- `source = table | flatten coor`
- `source = table | flatten bridges | flatten coor`
- `source = table | fields bridges | flatten bridges`
- `source = table | fields country, bridges | flatten bridges | fields country, length | stats avg(length) as avg by country`

```sql
source = table | eval e = eval status_category =
case(a >= 200 AND a < 300, 'Success',
Expand Down
2 changes: 2 additions & 0 deletions docs/ppl-lang/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ For additional examples see the next [documentation](PPL-Example-Commands.md).
- [`describe command`](PPL-Example-Commands.md/#describe)

- [`fillnull command`](ppl-fillnull-command.md)

- [`flatten command`](ppl-flatten-command.md)

- [`eval command`](ppl-eval-command.md)

Expand Down
90 changes: 90 additions & 0 deletions docs/ppl-lang/ppl-flatten-command.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
## PPL `flatten` command

### Description
Using `flatten` command to flatten a field of type:
- `struct<?,?>`
- `array<struct<?,?>>`


### Syntax
`flatten <field>`

* field: to be flattened. The field must be of supported type.

### Test table
#### Schema
| col\_name | data\_type |
|-----------|-------------------------------------------------|
| \_time | string |
| bridges | array\<struct\<length:bigint,name:string\>\> |
| city | string |
| coor | struct\<alt:bigint,lat:double,long:double\> |
| country | string |
#### Data
| \_time | bridges | city | coor | country |
|---------------------|----------------------------------------------|---------|------------------------|---------------|
| 2024-09-13T12:00:00 | [{801, Tower Bridge}, {928, London Bridge}] | London | {35, 51.5074, -0.1278} | England |
| 2024-09-13T12:00:00 | [{232, Pont Neuf}, {160, Pont Alexandre III}]| Paris | {35, 48.8566, 2.3522} | France |
| 2024-09-13T12:00:00 | [{48, Rialto Bridge}, {11, Bridge of Sighs}] | Venice | {2, 45.4408, 12.3155} | Italy |
| 2024-09-13T12:00:00 | [{516, Charles Bridge}, {343, Legion Bridge}]| Prague | {200, 50.0755, 14.4378}| Czech Republic|
| 2024-09-13T12:00:00 | [{375, Chain Bridge}, {333, Liberty Bridge}] | Budapest| {96, 47.4979, 19.0402} | Hungary |
| 1990-09-13T12:00:00 | NULL | Warsaw | NULL | Poland |



### Example 1: flatten struct
This example shows how to flatten a struct field.
PPL query:
- `source=table | flatten coor`

| \_time | bridges | city | country | alt | lat | long |
|---------------------|----------------------------------------------|---------|---------------|-----|--------|--------|
| 2024-09-13T12:00:00 | [{801, Tower Bridge}, {928, London Bridge}] | London | England | 35 | 51.5074| -0.1278|
| 2024-09-13T12:00:00 | [{232, Pont Neuf}, {160, Pont Alexandre III}]| Paris | France | 35 | 48.8566| 2.3522 |
| 2024-09-13T12:00:00 | [{48, Rialto Bridge}, {11, Bridge of Sighs}] | Venice | Italy | 2 | 45.4408| 12.3155|
| 2024-09-13T12:00:00 | [{516, Charles Bridge}, {343, Legion Bridge}]| Prague | Czech Republic| 200 | 50.0755| 14.4378|
| 2024-09-13T12:00:00 | [{375, Chain Bridge}, {333, Liberty Bridge}] | Budapest| Hungary | 96 | 47.4979| 19.0402|
| 1990-09-13T12:00:00 | NULL | Warsaw | Poland | NULL| NULL | NULL |



### Example 2: flatten array

The example shows how to flatten an array of struct fields.

PPL query:
- `source=table | flatten bridges`

| \_time | city | coor | country | length | name |
|---------------------|---------|------------------------|---------------|--------|-------------------|
| 2024-09-13T12:00:00 | London | {35, 51.5074, -0.1278} | England | 801 | Tower Bridge |
| 2024-09-13T12:00:00 | London | {35, 51.5074, -0.1278} | England | 928 | London Bridge |
| 2024-09-13T12:00:00 | Paris | {35, 48.8566, 2.3522} | France | 232 | Pont Neuf |
| 2024-09-13T12:00:00 | Paris | {35, 48.8566, 2.3522} | France | 160 | Pont Alexandre III|
| 2024-09-13T12:00:00 | Venice | {2, 45.4408, 12.3155} | Italy | 48 | Rialto Bridge |
| 2024-09-13T12:00:00 | Venice | {2, 45.4408, 12.3155} | Italy | 11 | Bridge of Sighs |
| 2024-09-13T12:00:00 | Prague | {200, 50.0755, 14.4378}| Czech Republic| 516 | Charles Bridge |
| 2024-09-13T12:00:00 | Prague | {200, 50.0755, 14.4378}| Czech Republic| 343 | Legion Bridge |
| 2024-09-13T12:00:00 | Budapest| {96, 47.4979, 19.0402} | Hungary | 375 | Chain Bridge |
| 2024-09-13T12:00:00 | Budapest| {96, 47.4979, 19.0402} | Hungary | 333 | Liberty Bridge |
| 1990-09-13T12:00:00 | Warsaw | NULL | Poland | NULL | NULL |


### Example 3: flatten array and struct
This example shows how to flatten multiple fields.
PPL query:
- `source=table | flatten bridges | flatten coor`

| \_time | city | country | length | name | alt | lat | long |
|---------------------|---------|---------------|--------|-------------------|------|--------|--------|
| 2024-09-13T12:00:00 | London | England | 801 | Tower Bridge | 35 | 51.5074| -0.1278|
| 2024-09-13T12:00:00 | London | England | 928 | London Bridge | 35 | 51.5074| -0.1278|
| 2024-09-13T12:00:00 | Paris | France | 232 | Pont Neuf | 35 | 48.8566| 2.3522 |
| 2024-09-13T12:00:00 | Paris | France | 160 | Pont Alexandre III| 35 | 48.8566| 2.3522 |
| 2024-09-13T12:00:00 | Venice | Italy | 48 | Rialto Bridge | 2 | 45.4408| 12.3155|
| 2024-09-13T12:00:00 | Venice | Italy | 11 | Bridge of Sighs | 2 | 45.4408| 12.3155|
| 2024-09-13T12:00:00 | Prague | Czech Republic| 516 | Charles Bridge | 200 | 50.0755| 14.4378|
| 2024-09-13T12:00:00 | Prague | Czech Republic| 343 | Legion Bridge | 200 | 50.0755| 14.4378|
| 2024-09-13T12:00:00 | Budapest| Hungary | 375 | Chain Bridge | 96 | 47.4979| 19.0402|
| 2024-09-13T12:00:00 | Budapest| Hungary | 333 | Liberty Bridge | 96 | 47.4979| 19.0402|
| 1990-09-13T12:00:00 | Warsaw | Poland | NULL | NULL | NULL | NULL | NULL |
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

package org.opensearch.flint.spark

import java.nio.file.{Files, Paths}
import java.nio.file.{Files, Path, Paths}
import java.util.Comparator
import java.util.concurrent.{ScheduledExecutorService, ScheduledFuture}

Expand Down Expand Up @@ -534,6 +534,28 @@ trait FlintSparkSuite extends QueryTest with FlintSuite with OpenSearchSuite wit
|""".stripMargin)
}

protected def createMultiValueStructTable(testTable: String): Unit = {
// CSV doesn't support struct field
sql(s"""
| CREATE TABLE $testTable
| (
| int_col INT,
| multi_value Array<STRUCT<name: STRING, value: INT>>
| )
| USING JSON
|""".stripMargin)

sql(s"""
| INSERT INTO $testTable
| SELECT /*+ COALESCE(1) */ *
| FROM VALUES
| ( 1, array(STRUCT("1_one", 1), STRUCT(null, 11), STRUCT("1_three", null)) ),
| ( 2, array(STRUCT("2_Monday", 2), null) ),
| ( 3, array(STRUCT("3_third", 3), STRUCT("3_4th", 4)) ),
| ( 4, null )
|""".stripMargin)
}

protected def createTableIssue112(testTable: String): Unit = {
sql(s"""
| CREATE TABLE $testTable (
Expand Down Expand Up @@ -695,4 +717,100 @@ trait FlintSparkSuite extends QueryTest with FlintSuite with OpenSearchSuite wit
| (9, '2001:db8::ff00:12:', true, false)
| """.stripMargin)
}

protected def createNestedJsonContentTable(tempFile: Path, testTable: String): Unit = {
val json =
"""
|[
| {
| "_time": "2024-09-13T12:00:00",
| "bridges": [
| {"name": "Tower Bridge", "length": 801},
| {"name": "London Bridge", "length": 928}
| ],
| "city": "London",
| "country": "England",
| "coor": {
| "lat": 51.5074,
| "long": -0.1278,
| "alt": 35
| }
| },
| {
| "_time": "2024-09-13T12:00:00",
| "bridges": [
| {"name": "Pont Neuf", "length": 232},
| {"name": "Pont Alexandre III", "length": 160}
| ],
| "city": "Paris",
| "country": "France",
| "coor": {
| "lat": 48.8566,
| "long": 2.3522,
| "alt": 35
| }
| },
| {
| "_time": "2024-09-13T12:00:00",
| "bridges": [
| {"name": "Rialto Bridge", "length": 48},
| {"name": "Bridge of Sighs", "length": 11}
| ],
| "city": "Venice",
| "country": "Italy",
| "coor": {
| "lat": 45.4408,
| "long": 12.3155,
| "alt": 2
| }
| },
| {
| "_time": "2024-09-13T12:00:00",
| "bridges": [
| {"name": "Charles Bridge", "length": 516},
| {"name": "Legion Bridge", "length": 343}
| ],
| "city": "Prague",
| "country": "Czech Republic",
| "coor": {
| "lat": 50.0755,
| "long": 14.4378,
| "alt": 200
| }
| },
| {
| "_time": "2024-09-13T12:00:00",
| "bridges": [
| {"name": "Chain Bridge", "length": 375},
| {"name": "Liberty Bridge", "length": 333}
| ],
| "city": "Budapest",
| "country": "Hungary",
| "coor": {
| "lat": 47.4979,
| "long": 19.0402,
| "alt": 96
| }
| },
| {
| "_time": "1990-09-13T12:00:00",
| "bridges": null,
| "city": "Warsaw",
| "country": "Poland",
| "coor": null
| }
|]
|""".stripMargin
val tempFile = Files.createTempFile("jsonTestData", ".json")
val absolutPath = tempFile.toAbsolutePath.toString;
Files.write(tempFile, json.getBytes)
sql(s"""
| CREATE TEMPORARY VIEW $testTable
| USING org.apache.spark.sql.json
| OPTIONS (
| path "$absolutPath",
| multiLine true
| );
|""".stripMargin)
}
}
Loading

0 comments on commit b71a9ed

Please sign in to comment.