Skip to content

Commit

Permalink
Convert ingest processor supports ip type (opensearch-project#12818)
Browse files Browse the repository at this point in the history
* Convert ingest processor supports ip type

Signed-off-by: Gao Binlong <gbinlong@amazon.com>

* Modify change log

Signed-off-by: Gao Binlong <gbinlong@amazon.com>

* Add comment

Signed-off-by: Gao Binlong <gbinlong@amazon.com>

---------

Signed-off-by: Gao Binlong <gbinlong@amazon.com>
  • Loading branch information
gaobinlong authored Mar 27, 2024
1 parent 618782d commit 8d5a1d2
Show file tree
Hide file tree
Showing 4 changed files with 123 additions and 0 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),

## [Unreleased 2.x]
### Added
- Convert ingest processor supports ip type ([#12818](https://github.com/opensearch-project/OpenSearch/pull/12818))
- Add a counter to node stat api to track shard going from idle to non-idle ([#12768](https://github.com/opensearch-project/OpenSearch/pull/12768))
- [Concurrent Segment Search] Perform buildAggregation concurrently and support Composite Aggregations ([#12697](https://github.com/opensearch-project/OpenSearch/pull/12697))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@

package org.opensearch.ingest.common;

import org.opensearch.common.network.InetAddresses;
import org.opensearch.ingest.AbstractProcessor;
import org.opensearch.ingest.ConfigurationUtils;
import org.opensearch.ingest.IngestDocument;
Expand Down Expand Up @@ -118,6 +119,19 @@ public Object convert(Object value) {
return value.toString();
}
},
IP {
@Override
public Object convert(Object value) {
// If the value is a valid ipv4/ipv6 address, we return the original value directly because IpFieldType
// can accept string value, this is simpler than we return an InetAddress object which needs to do more
// work such as serialization
if (value instanceof String && InetAddresses.isInetAddress(value.toString())) {
return value;
} else {
throw new IllegalArgumentException("[" + value + "] is not a valid ipv4/ipv6 address");
}
}
},
AUTO {
@Override
public Object convert(Object value) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -550,4 +550,29 @@ public void testTargetField() throws Exception {
assertThat(ingestDocument.getFieldValue(fieldName, String.class), equalTo(String.valueOf(randomInt)));
assertThat(ingestDocument.getFieldValue(targetField, Integer.class), equalTo(randomInt));
}

public void testConvertIP() throws Exception {
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random());
String validIPString;
if (randomBoolean()) {
validIPString = "1.2.3.4";
} else {
validIPString = "::1";
}
String fieldName = RandomDocumentPicks.addRandomField(random(), ingestDocument, validIPString);

Processor processor = new ConvertProcessor(randomAlphaOfLength(10), null, fieldName, fieldName, Type.IP, false);
processor.execute(ingestDocument);
assertThat(ingestDocument.getFieldValue(fieldName, String.class), equalTo(validIPString));

String invalidIPString = randomAlphaOfLength(10);
fieldName = RandomDocumentPicks.addRandomField(random(), ingestDocument, invalidIPString);
Processor processorWithInvalidIP = new ConvertProcessor(randomAlphaOfLength(10), null, fieldName, fieldName, Type.IP, false);
try {
processorWithInvalidIP.execute(ingestDocument);
fail("processor execute should have failed");
} catch (IllegalArgumentException e) {
assertThat(e.getMessage(), equalTo("[" + invalidIPString + "] is not a valid ipv4/ipv6 address"));
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
---
teardown:
- do:
ingest.delete_pipeline:
id: "1"
ignore: 404

---
"Test convert processor with ip type":
- skip:
version: " - 2.13.99"
reason: "introduced in 2.14.0"
- do:
ingest.put_pipeline:
id: "1"
body: >
{
"processors": [
{
"convert" : {
"field" : "raw_ip",
"type": "ip"
}
}
]
}
- match: { acknowledged: true }

- do:
catch: /\[1.1.1.\] is not a valid ipv4\/ipv6 address/
index:
index: test
id: 1
pipeline: "1"
body: {
raw_ip: "1.1.1."
}

- do:
ingest.put_pipeline:
id: "1"
body: >
{
"processors": [
{
"convert" : {
"field" : "raw_ip",
"target_field" : "ip_field",
"type" : "ip",
"ignore_failure" : true
}
}
]
}
- match: { acknowledged: true }

- do:
index:
index: test
id: 1
pipeline: "1"
body: {
raw_ip: "1.1.1."
}
- do:
get:
index: test
id: 1
- match: { _source: { raw_ip: "1.1.1."} }

- do:
index:
index: test
id: 1
pipeline: "1"
body: {
raw_ip: "1.1.1.1"
}
- do:
get:
index: test
id: 1
- match: { _source: { raw_ip: "1.1.1.1", ip_field: "1.1.1.1"} }

0 comments on commit 8d5a1d2

Please sign in to comment.