Skip to content

Commit

Permalink
Merge from main
Browse files Browse the repository at this point in the history
Signed-off-by: Harsha Vamsi Kalluri <harshavamsi096@gmail.com>
  • Loading branch information
harshavamsi committed Apr 30, 2024
1 parent fc94f6c commit f690458
Show file tree
Hide file tree
Showing 80 changed files with 4,665 additions and 315 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,18 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- [Tiered Caching] Add dimension-based stats to ICache implementations. ([#12531](https://github.com/opensearch-project/OpenSearch/pull/12531))
- Add changes for overriding remote store and replication settings during snapshot restore. ([#11868](https://github.com/opensearch-project/OpenSearch/pull/11868))
- Add an individual setting of rate limiter for segment replication ([#12959](https://github.com/opensearch-project/OpenSearch/pull/12959))
- [Tiered Caching] Expose new cache stats API ([#13237](https://github.com/opensearch-project/OpenSearch/pull/13237))
- [Streaming Indexing] Ensure support of the new transport by security plugin ([#13174](https://github.com/opensearch-project/OpenSearch/pull/13174))
- Add cluster setting to dynamically configure the buckets for filter rewrite optimization. ([#13179](https://github.com/opensearch-project/OpenSearch/pull/13179))
- [Tiered Caching] Gate new stats logic behind FeatureFlags.PLUGGABLE_CACHE ([#13238](https://github.com/opensearch-project/OpenSearch/pull/13238))
- [Tiered Caching] Add a dynamic setting to disable/enable disk cache. ([#13373](https://github.com/opensearch-project/OpenSearch/pull/13373))
- [Remote Store] Add capability of doing refresh as determined by the translog ([#12992](https://github.com/opensearch-project/OpenSearch/pull/12992))
- [Batch Ingestion] Add `batch_size` to `_bulk` API. ([#12457](https://github.com/opensearch-project/OpenSearch/issues/12457))
- [Tiered caching] Make Indices Request Cache Stale Key Mgmt Threshold setting dynamic ([#12941](https://github.com/opensearch-project/OpenSearch/pull/12941))
- Batch mode for async fetching shard information in GatewayAllocator for unassigned shards ([#8746](https://github.com/opensearch-project/OpenSearch/pull/8746))
- [Remote Store] Add settings for remote path type and hash algorithm ([#13225](https://github.com/opensearch-project/OpenSearch/pull/13225))
- [Remote Store] Upload remote paths during remote enabled index creation ([#13386](https://github.com/opensearch-project/OpenSearch/pull/13386))
- [Search Pipeline] Handle default pipeline for multiple indices ([#13276](https://github.com/opensearch-project/OpenSearch/pull/13276))

### Dependencies
- Bump `org.apache.commons:commons-configuration2` from 2.10.0 to 2.10.1 ([#12896](https://github.com/opensearch-project/OpenSearch/pull/12896))
Expand All @@ -57,6 +60,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Bump `com.netflix.nebula.ospackage-base` from 11.8.1 to 11.9.0 ([#13440](https://github.com/opensearch-project/OpenSearch/pull/13440))
- Bump `org.bouncycastle:bc-fips` from 1.0.2.4 to 1.0.2.5 ([#13446](https://github.com/opensearch-project/OpenSearch/pull/13446))
- Bump `lycheeverse/lychee-action` from 1.9.3 to 1.10.0 ([#13447](https://github.com/opensearch-project/OpenSearch/pull/13447))
- Bump `org.gradle.test-retry` from 1.5.8 to 1.5.9 ([#13442](https://github.com/opensearch-project/OpenSearch/pull/13442))

### Changed
- [BWC and API enforcement] Enforcing the presence of API annotations at build time ([#12872](https://github.com/opensearch-project/OpenSearch/pull/12872))
Expand Down
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ plugins {
id 'opensearch.docker-support'
id 'opensearch.global-build-info'
id "com.diffplug.spotless" version "6.25.0" apply false
id "org.gradle.test-retry" version "1.5.8" apply false
id "org.gradle.test-retry" version "1.5.9" apply false
id "test-report-aggregation"
id 'jacoco-report-aggregation'
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.Date;
import java.util.EnumSet;
import java.util.HashMap;
Expand All @@ -90,6 +91,8 @@
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.TimeUnit;
import java.util.function.IntFunction;

Expand Down Expand Up @@ -642,12 +645,47 @@ public <K, V> Map<K, V> readMap(Writeable.Reader<K> keyReader, Writeable.Reader<
return Collections.emptyMap();
}
Map<K, V> map = new HashMap<>(size);
readIntoMap(keyReader, valueReader, map, size);
return map;
}

/**
* Read a serialized map into a SortedMap using the default ordering for the keys. If the result is empty it might be immutable.
*/
public <K extends Comparable<K>, V> SortedMap<K, V> readOrderedMap(Writeable.Reader<K> keyReader, Writeable.Reader<V> valueReader)
throws IOException {
return readOrderedMap(keyReader, valueReader, null);
}

/**
* Read a serialized map into a SortedMap, specifying a Comparator for the keys. If the result is empty it might be immutable.
*/
public <K extends Comparable<K>, V> SortedMap<K, V> readOrderedMap(
Writeable.Reader<K> keyReader,
Writeable.Reader<V> valueReader,
@Nullable Comparator<K> keyComparator
) throws IOException {
int size = readArraySize();
if (size == 0) {
return Collections.emptySortedMap();
}
SortedMap<K, V> sortedMap;
if (keyComparator == null) {
sortedMap = new TreeMap<>();
} else {
sortedMap = new TreeMap<>(keyComparator);
}
readIntoMap(keyReader, valueReader, sortedMap, size);
return sortedMap;
}

private <K, V> void readIntoMap(Writeable.Reader<K> keyReader, Writeable.Reader<V> valueReader, Map<K, V> map, int size)
throws IOException {
for (int i = 0; i < size; i++) {
K key = keyReader.read(this);
V value = valueReader.read(this);
map.put(key, value);
}
return map;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ public void close() throws IOException {
}

@Override
public ImmutableCacheStatsHolder stats() {
public ImmutableCacheStatsHolder stats(String[] levels) {
return null; // TODO: in TSC stats PR
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,11 @@ public ImmutableCacheStatsHolder stats() {
return null;
}

@Override
public ImmutableCacheStatsHolder stats(String[] levels) {
return null;
}

@Override
public void close() {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,3 +167,90 @@ teardown:
index: test_index
id: test_id3
- match: { _source: {"f1": "v2", "f2": 47, "field1": "value1"}}

---
"Test bulk API with batch enabled happy case":
- skip:
version: " - 2.13.99"
reason: "Added in 2.14.0"

- do:
bulk:
refresh: true
batch_size: 2
pipeline: "pipeline1"
body:
- '{"index": {"_index": "test_index", "_id": "test_id1"}}'
- '{"text": "text1"}'
- '{"index": {"_index": "test_index", "_id": "test_id2"}}'
- '{"text": "text2"}'
- '{"index": {"_index": "test_index", "_id": "test_id3"}}'
- '{"text": "text3"}'
- '{"index": {"_index": "test_index", "_id": "test_id4"}}'
- '{"text": "text4"}'
- '{"index": {"_index": "test_index", "_id": "test_id5", "pipeline": "pipeline2"}}'
- '{"text": "text5"}'
- '{"index": {"_index": "test_index", "_id": "test_id6", "pipeline": "pipeline2"}}'
- '{"text": "text6"}'

- match: { errors: false }

- do:
get:
index: test_index
id: test_id5
- match: { _source: {"text": "text5", "field2": "value2"}}

- do:
get:
index: test_index
id: test_id3
- match: { _source: { "text": "text3", "field1": "value1" } }

---
"Test bulk API with batch_size missing":
- skip:
version: " - 2.13.99"
reason: "Added in 2.14.0"

- do:
bulk:
refresh: true
pipeline: "pipeline1"
body:
- '{"index": {"_index": "test_index", "_id": "test_id1"}}'
- '{"text": "text1"}'
- '{"index": {"_index": "test_index", "_id": "test_id2"}}'
- '{"text": "text2"}'

- match: { errors: false }

- do:
get:
index: test_index
id: test_id1
- match: { _source: { "text": "text1", "field1": "value1" } }

- do:
get:
index: test_index
id: test_id2
- match: { _source: { "text": "text2", "field1": "value1" } }

---
"Test bulk API with invalid batch_size":
- skip:
version: " - 2.13.99"
reason: "Added in 2.14.0"

- do:
catch: bad_request
bulk:
refresh: true
batch_size: -1
pipeline: "pipeline1"
body:
- '{"index": {"_index": "test_index", "_id": "test_id1"}}'
- '{"text": "text1"}'
- '{"index": {"_index": "test_index", "_id": "test_id2"}}'
- '{"text": "text2"}'
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ private EhcacheDiskCache(Builder<K, V> builder) {
this.cache = buildCache(Duration.ofMillis(expireAfterAccess.getMillis()), builder);
List<String> dimensionNames = Objects.requireNonNull(builder.dimensionNames, "Dimension names can't be null");
// If this cache is being used, FeatureFlags.PLUGGABLE_CACHE is already on, so we can always use the DefaultCacheStatsHolder.
this.cacheStatsHolder = new DefaultCacheStatsHolder(dimensionNames);
this.cacheStatsHolder = new DefaultCacheStatsHolder(dimensionNames, EhcacheDiskCacheFactory.EHCACHE_DISK_CACHE_NAME);
}

@SuppressWarnings({ "rawtypes" })
Expand Down Expand Up @@ -446,12 +446,13 @@ public void close() {
}

/**
* Relevant stats for this cache.
* @return CacheStats
* Relevant stats for this cache, aggregated by levels.
* @param levels The levels to aggregate by.
* @return ImmutableCacheStatsHolder
*/
@Override
public ImmutableCacheStatsHolder stats() {
return cacheStatsHolder.getImmutableCacheStatsHolder();
public ImmutableCacheStatsHolder stats(String[] levels) {
return cacheStatsHolder.getImmutableCacheStatsHolder(levels);
}

/**
Expand Down Expand Up @@ -510,15 +511,15 @@ private long getNewValuePairSize(CacheEvent<? extends ICacheKey<K>, ? extends By
public void onEvent(CacheEvent<? extends ICacheKey<K>, ? extends ByteArrayWrapper> event) {
switch (event.getType()) {
case CREATED:
cacheStatsHolder.incrementEntries(event.getKey().dimensions);
cacheStatsHolder.incrementItems(event.getKey().dimensions);
cacheStatsHolder.incrementSizeInBytes(event.getKey().dimensions, getNewValuePairSize(event));
assert event.getOldValue() == null;
break;
case EVICTED:
this.removalListener.onRemoval(
new RemovalNotification<>(event.getKey(), deserializeValue(event.getOldValue()), RemovalReason.EVICTED)
);
cacheStatsHolder.decrementEntries(event.getKey().dimensions);
cacheStatsHolder.decrementItems(event.getKey().dimensions);
cacheStatsHolder.decrementSizeInBytes(event.getKey().dimensions, getOldValuePairSize(event));
cacheStatsHolder.incrementEvictions(event.getKey().dimensions);
assert event.getNewValue() == null;
Expand All @@ -527,15 +528,15 @@ public void onEvent(CacheEvent<? extends ICacheKey<K>, ? extends ByteArrayWrappe
this.removalListener.onRemoval(
new RemovalNotification<>(event.getKey(), deserializeValue(event.getOldValue()), RemovalReason.EXPLICIT)
);
cacheStatsHolder.decrementEntries(event.getKey().dimensions);
cacheStatsHolder.decrementItems(event.getKey().dimensions);
cacheStatsHolder.decrementSizeInBytes(event.getKey().dimensions, getOldValuePairSize(event));
assert event.getNewValue() == null;
break;
case EXPIRED:
this.removalListener.onRemoval(
new RemovalNotification<>(event.getKey(), deserializeValue(event.getOldValue()), RemovalReason.INVALIDATED)
);
cacheStatsHolder.decrementEntries(event.getKey().dimensions);
cacheStatsHolder.decrementItems(event.getKey().dimensions);
cacheStatsHolder.decrementSizeInBytes(event.getKey().dimensions, getOldValuePairSize(event));
assert event.getNewValue() == null;
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ public void testBasicGetAndPut() throws IOException {
String value = ehcacheTest.get(getICacheKey(entry.getKey()));
assertEquals(entry.getValue(), value);
}
assertEquals(randomKeys, ehcacheTest.stats().getTotalEntries());
assertEquals(randomKeys, ehcacheTest.stats().getTotalItems());
assertEquals(randomKeys, ehcacheTest.stats().getTotalHits());
assertEquals(expectedSize, ehcacheTest.stats().getTotalSizeInBytes());
assertEquals(randomKeys, ehcacheTest.count());
Expand Down Expand Up @@ -217,7 +217,7 @@ public void testConcurrentPut() throws Exception {
assertEquals(entry.getValue(), value);
}
assertEquals(randomKeys, ehcacheTest.count());
assertEquals(randomKeys, ehcacheTest.stats().getTotalEntries());
assertEquals(randomKeys, ehcacheTest.stats().getTotalItems());
ehcacheTest.close();
}
}
Expand Down Expand Up @@ -416,7 +416,7 @@ public String load(ICacheKey<String> key) {
assertEquals(1, numberOfTimesValueLoaded);
assertEquals(0, ((EhcacheDiskCache) ehcacheTest).getCompletableFutureMap().size());
assertEquals(1, ehcacheTest.stats().getTotalMisses());
assertEquals(1, ehcacheTest.stats().getTotalEntries());
assertEquals(1, ehcacheTest.stats().getTotalItems());
assertEquals(numberOfRequest - 1, ehcacheTest.stats().getTotalHits());
assertEquals(1, ehcacheTest.count());
ehcacheTest.close();
Expand Down
4 changes: 4 additions & 0 deletions rest-api-spec/src/main/resources/rest-api-spec/api/bulk.json
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,10 @@
"require_alias": {
"type": "boolean",
"description": "Sets require_alias for all incoming documents. Defaults to unset (false)"
},
"batch_size": {
"type": "int",
"description": "Sets the batch size"
}
},
"body":{
Expand Down
Loading

0 comments on commit f690458

Please sign in to comment.