Skip to content

Commit

Permalink
Merge branch 'main' into cache_key_serialization
Browse files Browse the repository at this point in the history
Signed-off-by: Sagar <99425694+sgup432@users.noreply.github.com>
  • Loading branch information
sgup432 authored Jan 9, 2024
2 parents bde4ec9 + a9ce180 commit 0850e11
Show file tree
Hide file tree
Showing 34 changed files with 1,904 additions and 85 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Per request phase latency ([#10351](https://github.com/opensearch-project/OpenSearch/issues/10351))
- Add cluster state stats ([#10670](https://github.com/opensearch-project/OpenSearch/pull/10670))
- [Tiered caching] Enabling serialization for IndicesRequestCache key object ([#10275](https://github.com/opensearch-project/OpenSearch/pull/10275))
- [Tiered caching] Defining interfaces, listeners and extending IndicesRequestCache with Tiered cache support ([#10753](https://github.com/opensearch-project/OpenSearch/pull/10753))
- [Remote cluster state] Restore cluster state version during remote state auto restore ([#10853](https://github.com/opensearch-project/OpenSearch/pull/10853))
- Update the indexRandom function to create more segments for concurrent search tests ([10247](https://github.com/opensearch-project/OpenSearch/pull/10247))
- Add support for query profiler with concurrent aggregation ([#9248](https://github.com/opensearch-project/OpenSearch/pull/9248))
Expand Down Expand Up @@ -215,6 +216,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Fix template setting override for replication type ([#11417](https://github.com/opensearch-project/OpenSearch/pull/11417))
- Fix Automatic addition of protocol broken in #11512 ([#11609](https://github.com/opensearch-project/OpenSearch/pull/11609))
- Fix issue when calling Delete PIT endpoint and no PITs exist ([#11711](https://github.com/opensearch-project/OpenSearch/pull/11711))
- Fix tracing context propagation for local transport instrumentation ([#11490](https://github.com/opensearch-project/OpenSearch/pull/11490))

### Security

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,11 @@ public IngestDocument execute(IngestDocument document) {
} else {
ingestScript = precompiledIngestScript;
}
ingestScript.execute(document.getSourceAndMetadata());
CollectionUtils.ensureNoSelfReferences(document.getSourceAndMetadata(), "ingest script");
IngestDocument mutableDocument = new IngestDocument(document);
ingestScript.execute(mutableDocument.getSourceAndMetadata());
CollectionUtils.ensureNoSelfReferences(mutableDocument.getSourceAndMetadata(), "ingest script");
document.getSourceAndMetadata().clear();
document.getSourceAndMetadata().putAll(mutableDocument.getSourceAndMetadata());
return document;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,4 +105,16 @@ private void assertIngestDocument(IngestDocument ingestDocument) {
int bytesTotal = ingestDocument.getFieldValue("bytes_in", Integer.class) + ingestDocument.getFieldValue("bytes_out", Integer.class);
assertThat(ingestDocument.getSourceAndMetadata().get("bytes_total"), is(bytesTotal));
}

public void testScriptingWithSelfReferencingSourceMetadata() {
ScriptProcessor processor = new ScriptProcessor(randomAlphaOfLength(10), null, script, null, scriptService);
IngestDocument originalIngestDocument = randomDocument();
String index = originalIngestDocument.getSourceAndMetadata().get(IngestDocument.Metadata.INDEX.getFieldName()).toString();
String id = originalIngestDocument.getSourceAndMetadata().get(IngestDocument.Metadata.ID.getFieldName()).toString();
Map<String, Object> sourceMetadata = originalIngestDocument.getSourceAndMetadata();
originalIngestDocument.getSourceAndMetadata().put("_source", sourceMetadata);
IngestDocument ingestDocument = new IngestDocument(index, id, null, null, null, originalIngestDocument.getSourceAndMetadata());
expectThrows(IllegalArgumentException.class, () -> processor.execute(ingestDocument));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -202,3 +202,79 @@ teardown:
id: 1
- match: { _source.source_field: "foo%20bar" }
- match: { _source.target_field: "foo bar" }

---
"Test self referencing source with ignore failure":
- do:
ingest.put_pipeline:
id: "my_pipeline"
body: >
{
"description": "_description",
"processors": [
{
"script" : {
"lang": "painless",
"source" : "ctx.foo['foo']=ctx.foo;ctx['test-field']='test-value'",
"ignore_failure": true
}
},
{
"script" : {
"lang": "painless",
"source" : "ctx.target_field = Processors.uppercase(ctx.source_field)"
}
}
]
}
- match: { acknowledged: true }

- do:
index:
index: test
id: 1
pipeline: "my_pipeline"
body: {source_field: "fooBar", foo: {foo: "bar"}}

- do:
get:
index: test
id: 1
- match: { _source.source_field: "fooBar" }
- match: { _source.target_field: "FOOBAR"}
- match: { _source.test-field: null}

---
"Test self referencing source without ignoring failure":
- do:
ingest.put_pipeline:
id: "my_pipeline"
body: >
{
"description": "_description",
"processors": [
{
"script" : {
"lang": "painless",
"source" : "ctx.foo['foo']=ctx.foo;ctx['test-field']='test-value'"
}
},
{
"script" : {
"lang": "painless",
"source" : "ctx.target_field = Processors.uppercase(ctx.source_field)"
}
}
]
}
- match: { acknowledged: true }

- do:
catch: bad_request
index:
index: test
id: 1
pipeline: "my_pipeline"
body: {source_field: "fooBar", foo: {foo: "bar"}}
- match: { error.root_cause.0.type: "illegal_argument_exception" }
- match: { error.root_cause.0.reason: "Iterable object is self-referencing itself (ingest script)" }
Original file line number Diff line number Diff line change
Expand Up @@ -1113,3 +1113,48 @@ teardown:
- match: { status: 400 }
- match: { error.root_cause.0.type: "illegal_argument_exception" }
- match: { error.root_cause.0.reason: "Failed to parse parameter [_if_primary_term], only int or long is accepted" }

---
"Test simulate with pipeline with ignore failure and cyclic field assignments in script":
- do:
ingest.simulate:
verbose: true
body: >
{
"pipeline": {
"description": "_description",
"processors": [
{
"script" : {
"ignore_failure" : true,
"lang": "painless",
"source": "ctx.foo['foo']=ctx.foo;ctx.tag='recursive'"
}
},
{
"script" : {
"lang": "painless",
"source" : "ctx.target_field = Processors.uppercase(ctx.foo.foo)"
}
}
]
},
"docs": [
{
"_source": {
"foo": {
"foo": "bar"
}
}
}
]
}
- length: { docs: 1 }
- length: { docs.0.processor_results: 2 }
- match: { docs.0.processor_results.0.status: "error_ignored" }
- match: { docs.0.processor_results.0.ignored_error.error.type: "illegal_argument_exception" }
- match: { docs.0.processor_results.0.doc._source.tag: null }
- match: { docs.0.processor_results.1.doc._source.target_field: "BAR" }
- match: { docs.0.processor_results.1.doc._source.foo.foo: "bar" }
- match: { docs.0.processor_results.1.status: "success" }
- match: { docs.0.processor_results.1.processor_type: "script" }
112 changes: 59 additions & 53 deletions server/src/main/java/org/opensearch/common/cache/Cache.java
Original file line number Diff line number Diff line change
Expand Up @@ -424,68 +424,74 @@ public V computeIfAbsent(K key, CacheLoader<K, V> loader) throws ExecutionExcept
}
});
if (value == null) {
// we need to synchronize loading of a value for a given key; however, holding the segment lock while
// invoking load can lead to deadlock against another thread due to dependent key loading; therefore, we
// need a mechanism to ensure that load is invoked at most once, but we are not invoking load while holding
// the segment lock; to do this, we atomically put a future in the map that can load the value, and then
// get the value from this future on the thread that won the race to place the future into the segment map
CacheSegment<K, V> segment = getCacheSegment(key);
CompletableFuture<Entry<K, V>> future;
CompletableFuture<Entry<K, V>> completableFuture = new CompletableFuture<>();
value = compute(key, loader);
}
return value;
}

try (ReleasableLock ignored = segment.writeLock.acquire()) {
future = segment.map.putIfAbsent(key, completableFuture);
}
private V compute(K key, CacheLoader<K, V> loader) throws ExecutionException {
long now = now();
// we need to synchronize loading of a value for a given key; however, holding the segment lock while
// invoking load can lead to deadlock against another thread due to dependent key loading; therefore, we
// need a mechanism to ensure that load is invoked at most once, but we are not invoking load while holding
// the segment lock; to do this, we atomically put a future in the map that can load the value, and then
// get the value from this future on the thread that won the race to place the future into the segment map
CacheSegment<K, V> segment = getCacheSegment(key);
CompletableFuture<Entry<K, V>> future;
CompletableFuture<Entry<K, V>> completableFuture = new CompletableFuture<>();

BiFunction<? super Entry<K, V>, Throwable, ? extends V> handler = (ok, ex) -> {
if (ok != null) {
try (ReleasableLock ignored = lruLock.acquire()) {
promote(ok, now);
}
return ok.value;
} else {
try (ReleasableLock ignored = segment.writeLock.acquire()) {
CompletableFuture<Entry<K, V>> sanity = segment.map.get(key);
if (sanity != null && sanity.isCompletedExceptionally()) {
segment.map.remove(key);
}
}
return null;
}
};
try (ReleasableLock ignored = segment.writeLock.acquire()) {
future = segment.map.putIfAbsent(key, completableFuture);
}

CompletableFuture<V> completableValue;
if (future == null) {
future = completableFuture;
completableValue = future.handle(handler);
V loaded;
try {
loaded = loader.load(key);
} catch (Exception e) {
future.completeExceptionally(e);
throw new ExecutionException(e);
}
if (loaded == null) {
NullPointerException npe = new NullPointerException("loader returned a null value");
future.completeExceptionally(npe);
throw new ExecutionException(npe);
} else {
future.complete(new Entry<>(key, loaded, now));
BiFunction<? super Entry<K, V>, Throwable, ? extends V> handler = (ok, ex) -> {
if (ok != null) {
try (ReleasableLock ignored = lruLock.acquire()) {
promote(ok, now);
}
return ok.value;
} else {
completableValue = future.handle(handler);
try (ReleasableLock ignored = segment.writeLock.acquire()) {
CompletableFuture<Entry<K, V>> sanity = segment.map.get(key);
if (sanity != null && sanity.isCompletedExceptionally()) {
segment.map.remove(key);
}
}
return null;
}
};

CompletableFuture<V> completableValue;
if (future == null) {
future = completableFuture;
completableValue = future.handle(handler);
V loaded;
try {
value = completableValue.get();
// check to ensure the future hasn't been completed with an exception
if (future.isCompletedExceptionally()) {
future.get(); // call get to force the exception to be thrown for other concurrent callers
throw new IllegalStateException("the future was completed exceptionally but no exception was thrown");
}
} catch (InterruptedException e) {
throw new IllegalStateException(e);
loaded = loader.load(key);
} catch (Exception e) {
future.completeExceptionally(e);
throw new ExecutionException(e);
}
if (loaded == null) {
NullPointerException npe = new NullPointerException("loader returned a null value");
future.completeExceptionally(npe);
throw new ExecutionException(npe);
} else {
future.complete(new Entry<>(key, loaded, now));
}
} else {
completableValue = future.handle(handler);
}
V value;
try {
value = completableValue.get();
// check to ensure the future hasn't been completed with an exception
if (future.isCompletedExceptionally()) {
future.get(); // call get to force the exception to be thrown for other concurrent callers
throw new IllegalStateException("the future was completed exceptionally but no exception was thrown");
}
} catch (InterruptedException e) {
throw new IllegalStateException(e);
}
return value;
}
Expand Down
34 changes: 34 additions & 0 deletions server/src/main/java/org/opensearch/common/cache/ICache.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.common.cache;

/**
* Represents a cache interface.
* @param <K> Type of key.
* @param <V> Type of value.
*
* @opensearch.experimental
*/
public interface ICache<K, V> {
V get(K key);

void put(K key, V value);

V computeIfAbsent(K key, LoadAwareCacheLoader<K, V> loader) throws Exception;

void invalidate(K key);

void invalidateAll();

Iterable<K> keys();

long count();

void refresh();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.common.cache;

/**
* Extends a cache loader with awareness of whether the data is loaded or not.
* @param <K> Type of key.
* @param <V> Type of value.
*
* @opensearch.internal
*/
public interface LoadAwareCacheLoader<K, V> extends CacheLoader<K, V> {
boolean isLoaded();
}
Loading

0 comments on commit 0850e11

Please sign in to comment.