Skip to content

Commit

Permalink
Extract some methods from IngestService's innerExecute method (#93213)
Browse files Browse the repository at this point in the history
  • Loading branch information
joegallo authored Jan 24, 2023
1 parent 2cad226 commit 601a0a5
Showing 1 changed file with 62 additions and 38 deletions.
100 changes: 62 additions & 38 deletions server/src/main/java/org/elasticsearch/ingest/IngestService.java
Original file line number Diff line number Diff line change
Expand Up @@ -875,41 +875,17 @@ private void innerExecute(
return;
}

String index = indexRequest.index();
String id = indexRequest.id();
String routing = indexRequest.routing();
long version = indexRequest.version();
VersionType versionType = indexRequest.versionType();
Map<String, Object> sourceAsMap = indexRequest.sourceAsMap();
IngestDocument ingestDocument = new IngestDocument(index, id, version, routing, versionType, sourceAsMap);
IngestDocument ingestDocument = newIngestDocument(indexRequest);
ingestDocument.executePipeline(pipeline, (result, e) -> {
if (e != null) {
handler.accept(e);
} else if (result == null) {
itemDroppedHandler.accept(slot);
handler.accept(null);
} else {
org.elasticsearch.script.Metadata metadata = ingestDocument.getMetadata();

// it's fine to set all metadata fields all the time, as ingest document holds their starting values
// before ingestion, which might also get modified during ingestion.
indexRequest.index(metadata.getIndex());
indexRequest.id(metadata.getId());
indexRequest.routing(metadata.getRouting());
indexRequest.version(metadata.getVersion());
if (metadata.getVersionType() != null) {
indexRequest.versionType(VersionType.fromString(metadata.getVersionType()));
}
Number number;
if ((number = metadata.getIfSeqNo()) != null) {
indexRequest.setIfSeqNo(number.longValue());
}
if ((number = metadata.getIfPrimaryTerm()) != null) {
indexRequest.setIfPrimaryTerm(number.longValue());
}
updateIndexRequestMetadata(indexRequest, ingestDocument.getMetadata());
try {
boolean ensureNoSelfReferences = ingestDocument.doNoSelfReferencesCheck();
indexRequest.source(ingestDocument.getSource(), indexRequest.getContentType(), ensureNoSelfReferences);
updateIndexRequestSource(indexRequest, ingestDocument);
} catch (IllegalArgumentException ex) {
// An IllegalArgumentException can be thrown when an ingest processor creates a source map that is self-referencing.
// In that case, we catch and wrap the exception, so we can include which pipeline failed.
Expand All @@ -930,25 +906,73 @@ private void innerExecute(
);
return;
}
Map<String, String> map;
if ((map = metadata.getDynamicTemplates()) != null) {
Map<String, String> mergedDynamicTemplates = new HashMap<>(indexRequest.getDynamicTemplates());
mergedDynamicTemplates.putAll(map);
indexRequest.setDynamicTemplates(mergedDynamicTemplates);
}
postIngest(ingestDocument, indexRequest);
cacheRawTimestamp(indexRequest, ingestDocument);

handler.accept(null);
}
});
}

private void postIngest(IngestDocument ingestDocument, IndexRequest indexRequest) {
if (indexRequest.getRawTimestamp() == null) {
/**
* Builds a new ingest document from the passed-in index request.
*/
private static IngestDocument newIngestDocument(final IndexRequest request) {
return new IngestDocument(
request.index(),
request.id(),
request.version(),
request.routing(),
request.versionType(),
request.sourceAsMap()
);
}

/**
* Updates an index request based on the metadata of an ingest document.
*/
private static void updateIndexRequestMetadata(final IndexRequest request, final org.elasticsearch.script.Metadata metadata) {
// it's fine to set all metadata fields all the time, as ingest document holds their starting values
// before ingestion, which might also get modified during ingestion.
request.index(metadata.getIndex());
request.id(metadata.getId());
request.routing(metadata.getRouting());
request.version(metadata.getVersion());
if (metadata.getVersionType() != null) {
request.versionType(VersionType.fromString(metadata.getVersionType()));
}
Number number;
if ((number = metadata.getIfSeqNo()) != null) {
request.setIfSeqNo(number.longValue());
}
if ((number = metadata.getIfPrimaryTerm()) != null) {
request.setIfPrimaryTerm(number.longValue());
}
Map<String, String> map;
if ((map = metadata.getDynamicTemplates()) != null) {
Map<String, String> mergedDynamicTemplates = new HashMap<>(request.getDynamicTemplates());
mergedDynamicTemplates.putAll(map);
request.setDynamicTemplates(mergedDynamicTemplates);
}
}

/**
* Updates an index request based on the source of an ingest document, guarding against self-references if necessary.
*/
private static void updateIndexRequestSource(final IndexRequest request, final IngestDocument document) {
boolean ensureNoSelfReferences = document.doNoSelfReferencesCheck();
request.source(document.getSource(), request.getContentType(), ensureNoSelfReferences);
}

/**
* Grab the @timestamp and store it on the index request so that TSDB can use it without needing to parse
* the source for this document.
*/
private static void cacheRawTimestamp(final IndexRequest request, final IngestDocument document) {
if (request.getRawTimestamp() == null) {
// cache the @timestamp from the ingest document's source map if there is one
Object rawTimestamp = ingestDocument.getSource().get(TimestampField.FIXED_TIMESTAMP_FIELD);
Object rawTimestamp = document.getSource().get(TimestampField.FIXED_TIMESTAMP_FIELD);
if (rawTimestamp != null) {
indexRequest.setRawTimestamp(rawTimestamp);
request.setRawTimestamp(rawTimestamp);
}
}
}
Expand Down

0 comments on commit 601a0a5

Please sign in to comment.