Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PPL geoip function #871

Merged
merged 10 commits into from
Dec 19, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 64 additions & 1 deletion docs/ppl-lang/functions/ppl-ip.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,67 @@ Note:
- `ip` can be an IPv4 or an IPv6 address
- `cidr` can be an IPv4 or an IPv6 block
- `ip` and `cidr` must be either both IPv4 or both IPv6
- `ip` and `cidr` must both be valid and non-empty/non-null
- `ip` and `cidr` must both be valid and non-empty/non-null

### `GEOIP`

**Description**

`GEOIP(ip[, property]...)` retrieves geospatial data corresponding to the provided `ip`.

**Argument type:**
kenrickyap marked this conversation as resolved.
Show resolved Hide resolved
- `ip` is string be **STRING**.
- `property` is **STRING** and must be one of the following:
- `COUNTRY_ISO_CODE`
- `COUNTRY_NAME`
- `CONTINENT_NAME`
- `REGION_ISO_CODE`
- `REGION_NAME`
- `CITY_NAME`
- `TIME_ZONE`
- `LOCATION`
- Return type:
- **STRING** if one property given
- **STRUCT_TYPE** if more than one or no property is given

Example:

_Without properties:_

os> source=ips | eval a = geoip(ip) | fields ip, a
fetched rows / total rows = 2/2
+---------------------+-------------------------------------------------------------------------------------------------------+
|ip |lol |
+---------------------+-------------------------------------------------------------------------------------------------------+
|66.249.157.90 |{JM, Jamaica, North America, 14, Saint Catherine Parish, Portmore, America/Jamaica, 17.9686,-76.8827} |
|2a09:bac2:19f8:2ac3::|{CA, Canada, North America, PE, Prince Edward Island, Charlottetown, America/Halifax, 46.2396,-63.1355}|
+---------------------+-------+------+-------------------------------------------------------------------------------------------------------+

_With one property:_

os> source=users | eval a = geoip(ip, COUNTRY_NAME) | fields ip, a
fetched rows / total rows = 2/2
+---------------------+-------+
|ip |a |
+---------------------+-------+
|66.249.157.90 |Jamaica|
|2a09:bac2:19f8:2ac3::|Canada |
+---------------------+-------+

_With multiple properties:_

os> source=users | eval a = geoip(ip, COUNTRY_NAME, REGION_NAME, CITY_NAME) | fields ip, a
fetched rows / total rows = 2/2
+---------------------+---------------------------------------------+
|ip |a |
+---------------------+---------------------------------------------+
|66.249.157.90 |{Jamaica, Saint Catherine Parish, Portmore} |
|2a09:bac2:19f8:2ac3::|{Canada, Prince Edward Island, Charlottetown}|
+---------------------+---------------------------------------------+

Note:
- To use `geoip` user must create spark table containing geo ip location data. Instructions to create table can be found [here](../../opensearch-geoip.md).
- `geoip` command by default expects the created table to be called `geoip_ip_data`.
- if a different table name is desired, can set `spark.geoip.tablename` spark config to new table name.
- `ip` can be an IPv4 or an IPv6 address.
- `geoip` commands will always calculated first if used with other eval functions.
68 changes: 68 additions & 0 deletions docs/ppl-lang/planning/ppl-geoip-command.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
## geoip syntax proposal

geoip function to add information about the geographical location of an IPv4 or IPv6 address

**Implementation syntax**
- `... | eval geoinfo = geoip([datasource,] ipAddress *[,properties])`
kenrickyap marked this conversation as resolved.
Show resolved Hide resolved
- generic syntax
- `... | eval geoinfo = geoip(ipAddress)`
- use the default geoip datasource
kenrickyap marked this conversation as resolved.
Show resolved Hide resolved
- `... | eval geoinfo = geoip("abc", ipAddress)`
- use the "abc" geoip datasource
- `... | eval geoinfo = geoip(ipAddress, city, location)`
- use the default geoip datasource, retrieve only city, and location
- `... | eval geoinfo = geoip("abc", ipAddress, city, location")`
kenrickyap marked this conversation as resolved.
Show resolved Hide resolved
- use the "abc" geoip datasource, retrieve only city, and location

**Implementation details**
- Current implementation requires user to have created a geoip table. Geoip table has the following schema:

```SQL
CREATE TABLE geoip (
cidr STRING,
country_iso_code STRING,
country_name STRING,
continent_name STRING,
region_iso_code STRING,
region_name STRING,
city_name STRING,
time_zone STRING,
location STRING,
ip_range_start BIGINT,
ip_range_end BIGINT,
ipv4 BOOLEAN
)
```

- `geoip` is resolved by performing a join on said table and projecting the resulting geoip data as a struct.
- an example of using `geoip` is equivalent to running the following SQL query:

```SQL
SELECT source.*, struct(geoip.country_name, geoip.city_name) AS a
kenrickyap marked this conversation as resolved.
Show resolved Hide resolved
FROM source, geoip
WHERE geoip.ip_range_start <= ip_to_int(source.ip)
AND geoip.ip_range_end > ip_to_int(source.ip)
AND geoip.ip_type = is_ipv4(source.ip);
```

**Future plan for additional data-sources**

- Currently only using pre-existing geoip table defined within spark is possible.
- There is future plans to allow users to specify data sources:
- API data sources - if users have their own geoip provided will create ability for users to configure and call said endpoints
- OpenSearch geospatial client - once geospatial client is published we can leverage client to utilize opensearch geo2ip functionality.

### New syntax definition in ANTLR
kenrickyap marked this conversation as resolved.
Show resolved Hide resolved

```ANTLR

// functions
evalFunctionCall
: evalFunctionName LT_PRTHS functionArgs RT_PRTHS
| geoipFunction
;

geoipFunction
: GEOIP LT_PRTHS (datasource = functionArg COMMA)? ipAddress = functionArg (COMMA properties = stringLiteral)? RT_PRTHS
;
```
Original file line number Diff line number Diff line change
Expand Up @@ -771,6 +771,82 @@ trait FlintSparkSuite extends QueryTest with FlintSuite with OpenSearchSuite wit
| """.stripMargin)
}

protected def createGeoIpTestTable(testTable: String): Unit = {
sql(
s"""
| CREATE TABLE $testTable
| (
| ip STRING,
| isValid BOOLEAN
| )
| USING $tableType $tableOptions
|""".stripMargin)

sql(
s"""
| INSERT INTO $testTable
| VALUES ('66.249.157.90', true),
| ('2a09:bac2:19f8:2ac3::', true),
| ('192.168.2.', false),
| ('2001:db8::ff00:12:', false)
| """.stripMargin)
}

protected def createGeoIpTable(): Unit = {
sql(
s"""
| CREATE TABLE geoip
| (
| cidr STRING,
| country_iso_code STRING,
| country_name STRING,
| continent_name STRING,
| region_iso_code STRING,
| region_name STRING,
| city_name STRING,
| time_zone STRING,
| location STRING,
| ip_range_start DECIMAL(38,0),
| ip_range_end DECIMAL(38,0),
| ipv4 BOOLEAN
| )
| USING $tableType $tableOptions
|""".stripMargin)

sql(
s"""
| INSERT INTO geoip
| VALUES (
| '66.249.157.0/24',
| 'JM',
| 'Jamaica',
| 'North America',
| '14',
| 'Saint Catherine Parish',
| 'Portmore',
| 'America/Jamaica',
| '17.9686,-76.8827',
| 1123654912,
| 1123655167,
| true
| ),
| (
| '2a09:bac2:19f8::/45',
| 'CA',
| 'Canada',
| 'North America',
| 'PE',
| 'Prince Edward Island',
| 'Charlottetown',
| 'America/Halifax',
| '46.2396,-63.1355',
| 55878094401180025937395073088449675264,
| 55878094401189697343951990121847324671,
| false
| )
| """.stripMargin)
}

protected def createNestedJsonContentTable(tempFile: Path, testTable: String): Unit = {
val json =
"""
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.spark.ppl

import org.apache.spark.sql.{QueryTest, Row}
import org.apache.spark.sql.streaming.StreamTest

class FlintSparkPPLGeoipITSuite
extends QueryTest
with LogicalPlanTestUtils
with FlintPPLSuite
with StreamTest {

/** Test table and index name */
private val testTable = "spark_catalog.default.flint_ppl_test"
override def beforeAll(): Unit = {
super.beforeAll()

// Create test table
createGeoIpTestTable(testTable)
createGeoIpTable()
}

protected override def afterEach(): Unit = {
super.afterEach()
// Stop all streaming jobs if any
spark.streams.active.foreach { job =>
job.stop()
job.awaitTermination()
}
}

test("test geoip with no parameters") {
kenrickyap marked this conversation as resolved.
Show resolved Hide resolved
val frame = sql(
s"""
| source = $testTable| where isValid = true | eval a = geoip(ip) | fields ip, a
| """.stripMargin)

// Retrieve the results
val results: Array[Row] = frame.collect()

// Define the expected results
val expectedResults: Array[Row] = Array(
Row("66.249.157.90", Row("JM", "Jamaica", "North America", "14", "Saint Catherine Parish", "Portmore", "America/Jamaica", "17.9686,-76.8827")),
Row("2a09:bac2:19f8:2ac3::", Row("CA", "Canada", "North America", "PE", "Prince Edward Island", "Charlottetown", "America/Halifax", "46.2396,-63.1355"))
)

// Compare the results
implicit val rowOrdering: Ordering[Row] = Ordering.by[Row, String](_.getAs[String](0))
assert(results.sorted.sameElements(expectedResults.sorted))
}

test("test geoip with one parameters") {
val frame = sql(
s"""
| source = $testTable| where isValid = true | eval a = geoip(ip, country_name) | fields ip, a
| """.stripMargin)

// Retrieve the results
val results: Array[Row] = frame.collect()
// Define the expected results
val expectedResults: Array[Row] = Array(
Row("66.249.157.90", "Jamaica"),
Row("2a09:bac2:19f8:2ac3::", "Canada")
)

// Compare the results
implicit val rowOrdering: Ordering[Row] = Ordering.by[Row, String](_.getAs[String](0))
assert(results.sorted.sameElements(expectedResults.sorted))
}

test("test geoip with multiple parameters") {
val frame = sql(
s"""
| source = $testTable| where isValid = true | eval a = geoip(ip, country_name, city_name) | fields ip, a
| """.stripMargin)

// Retrieve the results
val results: Array[Row] = frame.collect()
// Define the expected results
val expectedResults: Array[Row] = Array(
Row("66.249.157.90", Row("Jamaica", "Portmore")),
Row("2a09:bac2:19f8:2ac3::", Row("Canada", "Charlottetown"))
)

// Compare the results
implicit val rowOrdering: Ordering[Row] = Ordering.by[Row, String](_.getAs[String](0))
assert(results.sorted.sameElements(expectedResults.sorted))
kenrickyap marked this conversation as resolved.
Show resolved Hide resolved
}
}
15 changes: 12 additions & 3 deletions ppl-spark-integration/src/main/antlr4/OpenSearchPPLLexer.g4
Original file line number Diff line number Diff line change
Expand Up @@ -416,9 +416,6 @@ ISPRESENT: 'ISPRESENT';
BETWEEN: 'BETWEEN';
CIDRMATCH: 'CIDRMATCH';

// Geo Loction
GEOIP: 'GEOIP';
kenrickyap marked this conversation as resolved.
Show resolved Hide resolved

// FLOWCONTROL FUNCTIONS
IFNULL: 'IFNULL';
NULLIF: 'NULLIF';
Expand All @@ -428,6 +425,18 @@ TYPEOF: 'TYPEOF';
//OTHER CONDITIONAL EXPRESSIONS
COALESCE: 'COALESCE';

//GEOLOCATION FUNCTIONS
GEOIP: 'GEOIP';

//GEOLOCATION PROPERTIES
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will this list stay consistent across different implementation, or this is just for ip2geo?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kenrickyap lets try to align according to this schema

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

current list is documented in docs and design files

COUNTRY_ISO_CODE: 'COUNTRY_ISO_CODE';
COUNTRY_NAME: 'COUNTRY_NAME';
CONTINENT_NAME: 'CONTINENT_NAME';
REGION_ISO_CODE: 'REGION_ISO_CODE';
REGION_NAME: 'REGION_NAME';
CITY_NAME: 'CITY_NAME';
kenrickyap marked this conversation as resolved.
Show resolved Hide resolved
LOCATION: 'LOCATION';

// RELEVANCE FUNCTIONS AND PARAMETERS
MATCH: 'MATCH';
MATCH_PHRASE: 'MATCH_PHRASE';
Expand Down
Loading
Loading