Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Script: Move Map impl from IngestSourceAndMetadata to CtxMap #5

Open
wants to merge 4 commits into
base: ingest_ctx_map
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 88 additions & 0 deletions server/src/main/java/org/elasticsearch/ingest/IngestCtxMap.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.ingest;

import org.elasticsearch.core.Tuple;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.script.CtxMap;
import org.elasticsearch.script.Metadata;

import java.time.ZonedDateTime;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.stream.Collectors;

/**
* Map containing ingest source and metadata.
*
* The Metadata values in {@link IngestDocument.Metadata} are validated when put in the map.
* _index, _id and _routing must be a String or null
* _version_type must be a lower case VersionType or null
* _version must be representable as a long without loss of precision or null
* _dyanmic_templates must be a map
* _if_seq_no must be a long or null
* _if_primary_term must be a long or null
*
* The map is expected to be used by processors, server code should the typed getter and setters where possible.
*/
class IngestCtxMap extends CtxMap {

/**
* Create an IngestCtxMap with the given metadata, source and default validators
*/
IngestCtxMap(
String index,
String id,
long version,
String routing,
VersionType versionType,
ZonedDateTime timestamp,
Map<String, Object> source
) {
super(new HashMap<>(source), new Metadata(index, id, version, routing, versionType, timestamp));
}

/**
* Create IngestCtxMap from a source and metadata
*
* @param source the source document map
* @param metadata the metadata map
*/
IngestCtxMap(Map<String, Object> source, Metadata metadata) {
super(source, metadata);
}

/**
* Returns a new metadata map and the existing source map with metadata removed.
*/
public static Tuple<Map<String, Object>, Map<String, Object>> splitSourceAndMetadata(Map<String, Object> sourceAndMetadata) {
return CtxMap.splitSourceAndMetadata(
sourceAndMetadata,
Arrays.stream(IngestDocument.Metadata.values()).map(IngestDocument.Metadata::getFieldName).collect(Collectors.toSet())
);
}

/**
* Fetch the timestamp from the ingestMetadata, if it exists
* @return the timestamp for the document or null
*/
public static ZonedDateTime getTimestamp(Map<String, Object> ingestMetadata) {
if (ingestMetadata == null) {
return null;
}
Object ts = ingestMetadata.get(IngestDocument.TIMESTAMP);
if (ts instanceof ZonedDateTime timestamp) {
return timestamp;
} else if (ts instanceof String str) {
return ZonedDateTime.parse(str);
}
return null;
}
}
28 changes: 10 additions & 18 deletions server/src/main/java/org/elasticsearch/ingest/IngestDocument.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public final class IngestDocument {

static final String TIMESTAMP = "timestamp";

private final IngestSourceAndMetadata sourceAndMetadata;
private final IngestCtxMap sourceAndMetadata;
private final Map<String, Object> ingestMetadata;

// Contains all pipelines that have been executed for this document
Expand All @@ -58,15 +58,7 @@ public final class IngestDocument {
private boolean doNoSelfReferencesCheck = false;

public IngestDocument(String index, String id, long version, String routing, VersionType versionType, Map<String, Object> source) {
this.sourceAndMetadata = new IngestSourceAndMetadata(
index,
id,
version,
routing,
versionType,
ZonedDateTime.now(ZoneOffset.UTC),
source
);
this.sourceAndMetadata = new IngestCtxMap(index, id, version, routing, versionType, ZonedDateTime.now(ZoneOffset.UTC), source);
this.ingestMetadata = new HashMap<>();
this.ingestMetadata.put(TIMESTAMP, sourceAndMetadata.getMetadata().getTimestamp());
}
Expand All @@ -76,7 +68,7 @@ public IngestDocument(String index, String id, long version, String routing, Ver
*/
public IngestDocument(IngestDocument other) {
this(
new IngestSourceAndMetadata(deepCopyMap(other.sourceAndMetadata.getSource()), other.sourceAndMetadata.getMetadata().clone()),
new IngestCtxMap(deepCopyMap(other.sourceAndMetadata.getSource()), other.sourceAndMetadata.getMetadata().clone()),
deepCopyMap(other.ingestMetadata)
);
}
Expand All @@ -85,10 +77,10 @@ public IngestDocument(IngestDocument other) {
* Constructor to create an IngestDocument from its constituent maps. The maps are shallow copied.
*/
public IngestDocument(Map<String, Object> sourceAndMetadata, Map<String, Object> ingestMetadata) {
Tuple<Map<String, Object>, Map<String, Object>> sm = IngestSourceAndMetadata.splitSourceAndMetadata(sourceAndMetadata);
this.sourceAndMetadata = new IngestSourceAndMetadata(
Tuple<Map<String, Object>, Map<String, Object>> sm = IngestCtxMap.splitSourceAndMetadata(sourceAndMetadata);
this.sourceAndMetadata = new IngestCtxMap(
sm.v1(),
new org.elasticsearch.script.Metadata(sm.v2(), IngestSourceAndMetadata.getTimestamp(ingestMetadata))
new org.elasticsearch.script.Metadata(sm.v2(), IngestCtxMap.getTimestamp(ingestMetadata))
);
this.ingestMetadata = new HashMap<>(ingestMetadata);
this.ingestMetadata.computeIfPresent(TIMESTAMP, (k, v) -> {
Expand All @@ -102,7 +94,7 @@ public IngestDocument(Map<String, Object> sourceAndMetadata, Map<String, Object>
/**
* Constructor to create an IngestDocument from its constituent maps
*/
IngestDocument(IngestSourceAndMetadata sourceAndMetadata, Map<String, Object> ingestMetadata) {
IngestDocument(IngestCtxMap sourceAndMetadata, Map<String, Object> ingestMetadata) {
this.sourceAndMetadata = sourceAndMetadata;
this.ingestMetadata = ingestMetadata;
}
Expand Down Expand Up @@ -724,9 +716,9 @@ public Map<String, Object> getSourceAndMetadata() {
}

/**
* Get source and metadata map as {@link IngestSourceAndMetadata}
* Get source and metadata map as {@link IngestCtxMap}
*/
public IngestSourceAndMetadata getIngestSourceAndMetadata() {
public IngestCtxMap getIngestSourceAndMetadata() {
return sourceAndMetadata;
}

Expand Down Expand Up @@ -763,7 +755,7 @@ public static Object deepCopy(Object value) {
for (Map.Entry<?, ?> entry : mapValue.entrySet()) {
copy.put(entry.getKey(), deepCopy(entry.getValue()));
}
// TODO(stu): should this check for IngestSourceAndMetadata in addition to Map?
// TODO(stu): should this check for IngestCtxMap in addition to Map?
return copy;
} else if (value instanceof List<?> listValue) {
List<Object> copy = new ArrayList<>(listValue.size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,12 @@
* Side Public License, v 1.
*/

package org.elasticsearch.ingest;
package org.elasticsearch.script;

import org.elasticsearch.common.util.Maps;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.script.Metadata;

import java.time.ZonedDateTime;
import java.util.AbstractCollection;
import java.util.AbstractMap;
import java.util.AbstractSet;
Expand All @@ -26,46 +23,17 @@
import java.util.Set;
import java.util.stream.Collectors;

/**
* Map containing ingest source and metadata.
*
* The Metadata values in {@link IngestDocument.Metadata} are validated when put in the map.
* _index, _id and _routing must be a String or null
* _version_type must be a lower case VersionType or null
* _version must be representable as a long without loss of precision or null
* _dyanmic_templates must be a map
* _if_seq_no must be a long or null
* _if_primary_term must be a long or null
*
* The map is expected to be used by processors, server code should the typed getter and setters where possible.
*/
class IngestSourceAndMetadata extends AbstractMap<String, Object> {

public class CtxMap extends AbstractMap<String, Object> {
protected final Map<String, Object> source;
protected final Metadata metadata;

/**
* Create an IngestSourceAndMetadata with the given metadata, source and default validators
*/
IngestSourceAndMetadata(
String index,
String id,
long version,
String routing,
VersionType versionType,
ZonedDateTime timestamp,
Map<String, Object> source
) {
this(new HashMap<>(source), new Metadata(index, id, version, routing, versionType, timestamp));
}

/**
* Create IngestSourceAndMetadata from a source and metadata
* Create CtxMap from a source and metadata
*
* @param source the source document map
* @param metadata the metadata map
*/
IngestSourceAndMetadata(Map<String, Object> source, Metadata metadata) {
protected CtxMap(Map<String, Object> source, Metadata metadata) {
this.source = source != null ? source : new HashMap<>();
this.metadata = metadata;
Set<String> badKeys = Sets.intersection(this.metadata.keySet(), this.source.keySet());
Expand All @@ -81,38 +49,25 @@ class IngestSourceAndMetadata extends AbstractMap<String, Object> {
/**
* Returns a new metadata map and the existing source map with metadata removed.
*/
public static Tuple<Map<String, Object>, Map<String, Object>> splitSourceAndMetadata(Map<String, Object> sourceAndMetadata) {
if (sourceAndMetadata instanceof IngestSourceAndMetadata ingestSourceAndMetadata) {
return new Tuple<>(new HashMap<>(ingestSourceAndMetadata.source), new HashMap<>(ingestSourceAndMetadata.metadata.getMap()));
public static Tuple<Map<String, Object>, Map<String, Object>> splitSourceAndMetadata(
Map<String, Object> sourceAndMetadata,
Set<String> metadataKeys
) {
if (sourceAndMetadata instanceof CtxMap ctxMap) {
return new Tuple<>(new HashMap<>(ctxMap.source), new HashMap<>(ctxMap.metadata.getMap()));
}
Map<String, Object> metadata = Maps.newHashMapWithExpectedSize(IngestDocument.Metadata.values().length);

Map<String, Object> metadata = Maps.newHashMapWithExpectedSize(metadataKeys.size());
Map<String, Object> source = new HashMap<>(sourceAndMetadata);
for (IngestDocument.Metadata ingestDocumentMetadata : IngestDocument.Metadata.values()) {
String metadataName = ingestDocumentMetadata.getFieldName();
if (sourceAndMetadata.containsKey(metadataName)) {
metadata.put(metadataName, source.remove(metadataName));

for (String metadataKey : metadataKeys) {
if (sourceAndMetadata.containsKey(metadataKey)) {
metadata.put(metadataKey, source.remove(metadataKey));
}
}
return new Tuple<>(source, metadata);
}

/**
* Fetch the timestamp from the ingestMetadata, if it exists
* @return the timestamp for the document or null
*/
public static ZonedDateTime getTimestamp(Map<String, Object> ingestMetadata) {
if (ingestMetadata == null) {
return null;
}
Object ts = ingestMetadata.get(IngestDocument.TIMESTAMP);
if (ts instanceof ZonedDateTime timestamp) {
return timestamp;
} else if (ts instanceof String str) {
return ZonedDateTime.parse(str);
}
return null;
}

/**
* get the source map, if externally modified then the guarantees of this class are not enforced
*/
Expand Down Expand Up @@ -326,10 +281,10 @@ public Object setValue(Object value) {
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
if ((o instanceof CtxMap) == false) return false;
if (super.equals(o) == false) return false;
IngestSourceAndMetadata that = (IngestSourceAndMetadata) o;
return Objects.equals(source, that.source) && Objects.equals(metadata, that.metadata);
CtxMap ctxMap = (CtxMap) o;
return source.equals(ctxMap.source) && metadata.equals(ctxMap.metadata);
}

@Override
Expand Down
Loading