diff --git a/.github/workflows/maintainer-approval.yml b/.github/workflows/maintainer-approval.yml index 34e8f57cc1878..fdc2bf16937b4 100644 --- a/.github/workflows/maintainer-approval.yml +++ b/.github/workflows/maintainer-approval.yml @@ -9,7 +9,7 @@ jobs: runs-on: ubuntu-latest steps: - id: find-maintainers - uses: actions/github-script@v7 + uses: actions/github-script@v7.0.1 with: github-token: ${{ secrets.GITHUB_TOKEN }} result-encoding: string diff --git a/.github/workflows/triage.yml b/.github/workflows/triage.yml index c305818bdb0a9..83bf4926a8c2d 100644 --- a/.github/workflows/triage.yml +++ b/.github/workflows/triage.yml @@ -9,7 +9,7 @@ jobs: if: github.repository == 'opensearch-project/OpenSearch' runs-on: ubuntu-latest steps: - - uses: actions/github-script@v7 + - uses: actions/github-script@v7.0.1 with: script: | const { issue, repository } = context.payload; diff --git a/.github/workflows/version.yml b/.github/workflows/version.yml index f4adef1ff06b0..be2a89ac931e9 100644 --- a/.github/workflows/version.yml +++ b/.github/workflows/version.yml @@ -129,7 +129,7 @@ jobs: - name: Create tracking issue id: create-issue - uses: actions/github-script@v6.4.0 + uses: actions/github-script@v7.0.1 with: script: | const body = ` diff --git a/CHANGELOG.md b/CHANGELOG.md index e05f8cf8c433e..b6221425cf4a3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -137,15 +137,15 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - New DateTime format for RFC3339 compatible date fields ([#11465](https://github.com/opensearch-project/OpenSearch/pull/11465)) - Add support for Google Application Default Credentials in repository-gcs ([#8394](https://github.com/opensearch-project/OpenSearch/pull/8394)) - Remove concurrent segment search feature flag for GA launch ([#12074](https://github.com/opensearch-project/OpenSearch/pull/12074)) +- Enable Fuzzy codec for doc id fields using a bloom filter ([#11022](https://github.com/opensearch-project/OpenSearch/pull/11022)) - Support for returning scores in matched queries ([#11626](https://github.com/opensearch-project/OpenSearch/pull/11626)) - ### Dependencies - Bumps jetty version to 9.4.52.v20230823 to fix GMS-2023-1857 ([#9822](https://github.com/opensearch-project/OpenSearch/pull/9822)) - Bump Lucene from 9.7.0 to 9.8.0 ([10276](https://github.com/opensearch-project/OpenSearch/pull/10276)) - Bump `commons-io:commons-io` from 2.13.0 to 2.15.1 ([#10294](https://github.com/opensearch-project/OpenSearch/pull/10294), [#11001](https://github.com/opensearch-project/OpenSearch/pull/11001), [#11002](https://github.com/opensearch-project/OpenSearch/pull/11002), [#11446](https://github.com/opensearch-project/OpenSearch/pull/11446), [#11554](https://github.com/opensearch-project/OpenSearch/pull/11554), [#11560](https://github.com/opensearch-project/OpenSearch/pull/11560), [#11796](https://github.com/opensearch-project/OpenSearch/pull/11796)) - Bump `com.google.api.grpc:proto-google-common-protos` from 2.10.0 to 2.25.1 ([#10208](https://github.com/opensearch-project/OpenSearch/pull/10208), [#10298](https://github.com/opensearch-project/OpenSearch/pull/10298)) -- Bump `com.netflix.nebula.ospackage-base` from 11.4.0 to 11.6.0 ([#10295](https://github.com/opensearch-project/OpenSearch/pull/10295), [#11630](https://github.com/opensearch-project/OpenSearch/pull/11630)) +- Bump `com.netflix.nebula.ospackage-base` from 11.4.0 to 11.8.0 ([#10295](https://github.com/opensearch-project/OpenSearch/pull/10295), [#11630](https://github.com/opensearch-project/OpenSearch/pull/11630), [#12167](https://github.com/opensearch-project/OpenSearch/pull/12167)) - Bump `org.apache.zookeeper:zookeeper` from 3.9.0 to 3.9.1 ([#10506](https://github.com/opensearch-project/OpenSearch/pull/10506)) - Bump `de.thetaphi:forbiddenapis` from 3.5.1 to 3.6 ([#10508](https://github.com/opensearch-project/OpenSearch/pull/10508)) - Bump `org.codehaus.woodstox:stax2-api` from 4.2.1 to 4.2.2 ([#10639](https://github.com/opensearch-project/OpenSearch/pull/10639)) @@ -155,7 +155,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Bump `org.apache.logging.log4j:log4j-core` from 2.20.0 to 2.22.1 ([#10858](https://github.com/opensearch-project/OpenSearch/pull/10858), [#11000](https://github.com/opensearch-project/OpenSearch/pull/11000), [#11270](https://github.com/opensearch-project/OpenSearch/pull/11270), [#11695](https://github.com/opensearch-project/OpenSearch/pull/11695)) - Bump `aws-actions/configure-aws-credentials` from 2 to 4 ([#10504](https://github.com/opensearch-project/OpenSearch/pull/10504)) - Bump `stefanzweifel/git-auto-commit-action` from 4 to 5 ([#11171](https://github.com/opensearch-project/OpenSearch/pull/11171)) -- Bump `actions/github-script` from 6 to 7 ([#11271](https://github.com/opensearch-project/OpenSearch/pull/11271)) +- Bump `actions/github-script` from 6 to 7.0.1 ([#11271](https://github.com/opensearch-project/OpenSearch/pull/11271), [#12166](https://github.com/opensearch-project/OpenSearch/pull/12166)) - Bump `jackson` and `jackson_databind` from 2.15.2 to 2.16.0 ([#11273](https://github.com/opensearch-project/OpenSearch/pull/11273)) - Bump `netty` from 4.1.100.Final to 4.1.106.Final ([#11294](https://github.com/opensearch-project/OpenSearch/pull/11294), [#11775](https://github.com/opensearch-project/OpenSearch/pull/11775)), [#12034](https://github.com/opensearch-project/OpenSearch/pull/12034)) - Bump `com.avast.gradle:gradle-docker-compose-plugin` from 0.16.12 to 0.17.6 ([#10163](https://github.com/opensearch-project/OpenSearch/pull/10163), [#11692](https://github.com/opensearch-project/OpenSearch/pull/11692)) @@ -188,6 +188,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Bump `reactor-core` from 3.5.11 to 3.5.14 ([#12042](https://github.com/opensearch-project/OpenSearch/pull/12042)) - Bump `com.google.http-client:google-http-client-jackson2` from 1.43.3 to 1.44.1 ([#12059](https://github.com/opensearch-project/OpenSearch/pull/12059)) - Bump `peter-evans/create-issue-from-file` from 4 to 5 ([#12057](https://github.com/opensearch-project/OpenSearch/pull/12057)) +- Bump `org.gradle.test-retry` from 1.5.4 to 1.5.8 ([#12168](https://github.com/opensearch-project/OpenSearch/pull/12168)) ### Changed - Mute the query profile IT with concurrent execution ([#9840](https://github.com/opensearch-project/OpenSearch/pull/9840)) diff --git a/benchmarks/src/main/java/org/opensearch/benchmark/index/codec/fuzzy/FilterConstructionBenchmark.java b/benchmarks/src/main/java/org/opensearch/benchmark/index/codec/fuzzy/FilterConstructionBenchmark.java new file mode 100644 index 0000000000000..4e995f5a5067c --- /dev/null +++ b/benchmarks/src/main/java/org/opensearch/benchmark/index/codec/fuzzy/FilterConstructionBenchmark.java @@ -0,0 +1,67 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.benchmark.index.codec.fuzzy; + +import org.apache.lucene.util.BytesRef; +import org.opensearch.common.UUIDs; +import org.opensearch.index.codec.fuzzy.FuzzySet; +import org.opensearch.index.codec.fuzzy.FuzzySetFactory; +import org.opensearch.index.codec.fuzzy.FuzzySetParameters; +import org.opensearch.index.mapper.IdFieldMapper; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Warmup; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +@Fork(3) +@Warmup(iterations = 2) +@Measurement(iterations = 5, time = 60, timeUnit = TimeUnit.SECONDS) +@BenchmarkMode(Mode.AverageTime) +@OutputTimeUnit(TimeUnit.MILLISECONDS) +@State(Scope.Benchmark) +public class FilterConstructionBenchmark { + + private List items; + + @Param({ "1000000", "10000000", "50000000" }) + private int numIds; + + @Param({ "0.0511", "0.1023", "0.2047" }) + private double fpp; + + private FuzzySetFactory fuzzySetFactory; + private String fieldName; + + @Setup + public void setupIds() { + this.fieldName = IdFieldMapper.NAME; + this.items = IntStream.range(0, numIds).mapToObj(i -> new BytesRef(UUIDs.base64UUID())).collect(Collectors.toList()); + FuzzySetParameters parameters = new FuzzySetParameters(() -> fpp); + this.fuzzySetFactory = new FuzzySetFactory(Map.of(fieldName, parameters)); + } + + @Benchmark + public FuzzySet buildFilter() throws IOException { + return fuzzySetFactory.createFuzzySet(items.size(), fieldName, () -> items.iterator()); + } +} diff --git a/benchmarks/src/main/java/org/opensearch/benchmark/index/codec/fuzzy/FilterLookupBenchmark.java b/benchmarks/src/main/java/org/opensearch/benchmark/index/codec/fuzzy/FilterLookupBenchmark.java new file mode 100644 index 0000000000000..383539219830e --- /dev/null +++ b/benchmarks/src/main/java/org/opensearch/benchmark/index/codec/fuzzy/FilterLookupBenchmark.java @@ -0,0 +1,80 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.benchmark.index.codec.fuzzy; + +import org.apache.lucene.util.BytesRef; +import org.opensearch.common.UUIDs; +import org.opensearch.index.codec.fuzzy.FuzzySet; +import org.opensearch.index.codec.fuzzy.FuzzySetFactory; +import org.opensearch.index.codec.fuzzy.FuzzySetParameters; +import org.opensearch.index.mapper.IdFieldMapper; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Warmup; +import org.openjdk.jmh.infra.Blackhole; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +@Fork(3) +@Warmup(iterations = 2) +@Measurement(iterations = 5, time = 60, timeUnit = TimeUnit.SECONDS) +@BenchmarkMode(Mode.AverageTime) +@OutputTimeUnit(TimeUnit.MILLISECONDS) +@State(Scope.Benchmark) +public class FilterLookupBenchmark { + + @Param({ "50000000", "1000000" }) + private int numItems; + + @Param({ "1000000" }) + private int searchKeyCount; + + @Param({ "0.0511", "0.1023", "0.2047" }) + private double fpp; + + private FuzzySet fuzzySet; + private List items; + private Random random = new Random(); + + @Setup + public void setupFilter() throws IOException { + String fieldName = IdFieldMapper.NAME; + items = IntStream.range(0, numItems).mapToObj(i -> new BytesRef(UUIDs.base64UUID())).collect(Collectors.toList()); + FuzzySetParameters parameters = new FuzzySetParameters(() -> fpp); + fuzzySet = new FuzzySetFactory(Map.of(fieldName, parameters)).createFuzzySet(numItems, fieldName, () -> items.iterator()); + } + + @Benchmark + public void contains_withExistingKeys(Blackhole blackhole) throws IOException { + for (int i = 0; i < searchKeyCount; i++) { + blackhole.consume(fuzzySet.contains(items.get(random.nextInt(items.size()))) == FuzzySet.Result.MAYBE); + } + } + + @Benchmark + public void contains_withRandomKeys(Blackhole blackhole) throws IOException { + for (int i = 0; i < searchKeyCount; i++) { + blackhole.consume(fuzzySet.contains(new BytesRef(UUIDs.base64UUID()))); + } + } +} diff --git a/build.gradle b/build.gradle index 375ab91e99e94..6f9aa0ea9e439 100644 --- a/build.gradle +++ b/build.gradle @@ -55,7 +55,7 @@ plugins { id 'opensearch.docker-support' id 'opensearch.global-build-info' id "com.diffplug.spotless" version "6.25.0" apply false - id "org.gradle.test-retry" version "1.5.4" apply false + id "org.gradle.test-retry" version "1.5.8" apply false id "test-report-aggregation" id 'jacoco-report-aggregation' } diff --git a/distribution/packages/build.gradle b/distribution/packages/build.gradle index ededa7bff34d8..43c38c5ad0c67 100644 --- a/distribution/packages/build.gradle +++ b/distribution/packages/build.gradle @@ -63,7 +63,7 @@ import java.util.regex.Pattern */ plugins { - id "com.netflix.nebula.ospackage-base" version "11.6.0" + id "com.netflix.nebula.ospackage-base" version "11.8.0" } void addProcessFilesTask(String type, boolean jdk) { diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties index f1d76d80bbfa3..82a4add334a7d 100644 --- a/gradle/wrapper/gradle-wrapper.properties +++ b/gradle/wrapper/gradle-wrapper.properties @@ -11,7 +11,7 @@ distributionBase=GRADLE_USER_HOME distributionPath=wrapper/dists -distributionUrl=https\://services.gradle.org/distributions/gradle-8.5-all.zip +distributionUrl=https\://services.gradle.org/distributions/gradle-8.6-all.zip zipStoreBase=GRADLE_USER_HOME zipStorePath=wrapper/dists -distributionSha256Sum=c16d517b50dd28b3f5838f0e844b7520b8f1eb610f2f29de7e4e04a1b7c9c79b +distributionSha256Sum=85719317abd2112f021d4f41f09ec370534ba288432065f4b477b6a3b652910d diff --git a/qa/rolling-upgrade/build.gradle b/qa/rolling-upgrade/build.gradle index 3dff452be855f..777377f04e8b9 100644 --- a/qa/rolling-upgrade/build.gradle +++ b/qa/rolling-upgrade/build.gradle @@ -62,6 +62,7 @@ for (Version bwcVersion : BuildParams.bwcVersions.wireCompatible) { setting 'repositories.url.allowed_urls', 'http://snapshot.test*' setting 'path.repo', "${buildDir}/cluster/shared/repo/${baseName}" setting 'http.content_type.required', 'true' + systemProperty 'opensearch.experimental.optimize_doc_id_lookup.fuzzy_set.enabled', 'true' } } diff --git a/qa/rolling-upgrade/src/test/java/org/opensearch/upgrades/IndexingIT.java b/qa/rolling-upgrade/src/test/java/org/opensearch/upgrades/IndexingIT.java index 1577260e145d4..08a5e92fc2d02 100644 --- a/qa/rolling-upgrade/src/test/java/org/opensearch/upgrades/IndexingIT.java +++ b/qa/rolling-upgrade/src/test/java/org/opensearch/upgrades/IndexingIT.java @@ -40,10 +40,10 @@ import org.opensearch.common.Booleans; import org.opensearch.common.io.Streams; import org.opensearch.common.settings.Settings; +import org.opensearch.index.IndexSettings; import org.opensearch.index.codec.CodecService; import org.opensearch.index.engine.EngineConfig; import org.opensearch.indices.replication.common.ReplicationType; -import org.opensearch.test.OpenSearchIntegTestCase; import org.opensearch.test.rest.yaml.ObjectPath; import java.io.IOException; @@ -344,6 +344,92 @@ public void testIndexingWithSegRep() throws Exception { } } + public void testIndexingWithFuzzyFilterPostings() throws Exception { + if (UPGRADE_FROM_VERSION.onOrBefore(Version.V_2_11_1)) { + logger.info("--> Skip test for version {} where fuzzy filter postings format feature is not available", UPGRADE_FROM_VERSION); + return; + } + final String indexName = "test-index-fuzzy-set"; + final int shardCount = 3; + final int replicaCount = 1; + logger.info("--> Case {}", CLUSTER_TYPE); + printClusterNodes(); + logger.info("--> _cat/shards before test execution \n{}", EntityUtils.toString(client().performRequest(new Request("GET", "/_cat/shards?v")).getEntity())); + switch (CLUSTER_TYPE) { + case OLD: + Settings.Builder settings = Settings.builder() + .put(IndexMetadata.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), shardCount) + .put(IndexMetadata.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), replicaCount) + .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) + .put( + EngineConfig.INDEX_CODEC_SETTING.getKey(), + randomFrom(new ArrayList<>(CODECS) { + { + add(CodecService.LUCENE_DEFAULT_CODEC); + } + }) + ) + .put(INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), "100ms"); + createIndex(indexName, settings.build()); + waitForClusterHealthWithNoShardMigration(indexName, "green"); + bulk(indexName, "_OLD", 5); + break; + case MIXED: + waitForClusterHealthWithNoShardMigration(indexName, "yellow"); + break; + case UPGRADED: + Settings.Builder settingsBuilder = Settings.builder() + .put(IndexSettings.INDEX_DOC_ID_FUZZY_SET_ENABLED_SETTING.getKey(), true); + updateIndexSettings(indexName, settingsBuilder); + waitForClusterHealthWithNoShardMigration(indexName, "green"); + break; + default: + throw new UnsupportedOperationException("Unknown cluster type [" + CLUSTER_TYPE + "]"); + } + + int expectedCount; + switch (CLUSTER_TYPE) { + case OLD: + expectedCount = 5; + break; + case MIXED: + if (Booleans.parseBoolean(System.getProperty("tests.first_round"))) { + expectedCount = 5; + } else { + expectedCount = 10; + } + break; + case UPGRADED: + expectedCount = 15; + break; + default: + throw new UnsupportedOperationException("Unknown cluster type [" + CLUSTER_TYPE + "]"); + } + + waitForSearchableDocs(indexName, shardCount, replicaCount); + assertCount(indexName, expectedCount); + + if (CLUSTER_TYPE != ClusterType.OLD) { + bulk(indexName, "_" + CLUSTER_TYPE, 5); + logger.info("--> Index one doc (to be deleted next) and verify doc count"); + Request toBeDeleted = new Request("PUT", "/" + indexName + "/_doc/to_be_deleted"); + toBeDeleted.addParameter("refresh", "true"); + toBeDeleted.setJsonEntity("{\"f1\": \"delete-me\"}"); + client().performRequest(toBeDeleted); + waitForSearchableDocs(indexName, shardCount, replicaCount); + assertCount(indexName, expectedCount + 6); + + logger.info("--> Delete previously added doc and verify doc count"); + Request delete = new Request("DELETE", "/" + indexName + "/_doc/to_be_deleted"); + delete.addParameter("refresh", "true"); + client().performRequest(delete); + waitForSearchableDocs(indexName, shardCount, replicaCount); + assertCount(indexName, expectedCount + 5); + + //forceMergeAndVerify(indexName, shardCount * (1 + replicaCount)); + } + } + public void testAutoIdWithOpTypeCreate() throws IOException { final String indexName = "auto_id_and_op_type_create_index"; StringBuilder b = new StringBuilder(); diff --git a/server/src/internalClusterTest/java/org/opensearch/action/admin/indices/create/RemoteCloneIndexIT.java b/server/src/internalClusterTest/java/org/opensearch/action/admin/indices/create/RemoteCloneIndexIT.java index a081110e6c5a1..f50e8fd0a38cf 100644 --- a/server/src/internalClusterTest/java/org/opensearch/action/admin/indices/create/RemoteCloneIndexIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/action/admin/indices/create/RemoteCloneIndexIT.java @@ -50,6 +50,8 @@ import org.opensearch.remotestore.RemoteStoreBaseIntegTestCase; import org.opensearch.test.VersionUtils; +import java.util.concurrent.ExecutionException; + import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount; import static org.hamcrest.Matchers.equalTo; @@ -130,4 +132,61 @@ public void testCreateCloneIndex() { } + public void testCreateCloneIndexFailure() throws ExecutionException, InterruptedException { + Version version = VersionUtils.randomIndexCompatibleVersion(random()); + int numPrimaryShards = 1; + prepareCreate("source").setSettings( + Settings.builder().put(indexSettings()).put("number_of_shards", numPrimaryShards).put("index.version.created", version) + ).get(); + final int docs = 2; + for (int i = 0; i < docs; i++) { + client().prepareIndex("source").setSource("{\"foo\" : \"bar\", \"i\" : " + i + "}", MediaTypeRegistry.JSON).get(); + } + internalCluster().ensureAtLeastNumDataNodes(2); + // ensure all shards are allocated otherwise the ensure green below might not succeed since we require the merge node + // if we change the setting too quickly we will end up with one replica unassigned which can't be assigned anymore due + // to the require._name below. + ensureGreen(); + // relocate all shards to one node such that we can merge it. + client().admin().indices().prepareUpdateSettings("source").setSettings(Settings.builder().put("index.blocks.write", true)).get(); + ensureGreen(); + + // disable rebalancing to be able to capture the right stats. balancing can move the target primary + // making it hard to pin point the source shards. + client().admin() + .cluster() + .prepareUpdateSettings() + .setTransientSettings(Settings.builder().put(EnableAllocationDecider.CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING.getKey(), "none")) + .get(); + try { + setFailRate(REPOSITORY_NAME, 100); + + client().admin() + .indices() + .prepareResizeIndex("source", "target") + .setResizeType(ResizeType.CLONE) + .setWaitForActiveShards(0) + .setSettings(Settings.builder().put("index.number_of_replicas", 0).putNull("index.blocks.write").build()) + .get(); + + Thread.sleep(2000); + ensureYellow("target"); + + } catch (ExecutionException | InterruptedException e) { + throw new RuntimeException(e); + } finally { + setFailRate(REPOSITORY_NAME, 0); + ensureGreen(); + // clean up + client().admin() + .cluster() + .prepareUpdateSettings() + .setTransientSettings( + Settings.builder().put(EnableAllocationDecider.CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING.getKey(), (String) null) + ) + .get(); + } + + } + } diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreBaseIntegTestCase.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreBaseIntegTestCase.java index 751de66a97806..e43ff9a412784 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreBaseIntegTestCase.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreBaseIntegTestCase.java @@ -9,6 +9,8 @@ package org.opensearch.remotestore; import org.opensearch.action.admin.cluster.remotestore.restore.RestoreRemoteStoreRequest; +import org.opensearch.action.admin.cluster.repositories.get.GetRepositoriesRequest; +import org.opensearch.action.admin.cluster.repositories.get.GetRepositoriesResponse; import org.opensearch.action.admin.indices.get.GetIndexRequest; import org.opensearch.action.admin.indices.get.GetIndexResponse; import org.opensearch.action.bulk.BulkItemResponse; @@ -37,7 +39,7 @@ import org.opensearch.indices.replication.common.ReplicationType; import org.opensearch.repositories.RepositoriesService; import org.opensearch.repositories.blobstore.BlobStoreRepository; -import org.opensearch.repositories.fs.FsRepository; +import org.opensearch.repositories.fs.ReloadableFsRepository; import org.opensearch.test.OpenSearchIntegTestCase; import org.junit.After; @@ -60,6 +62,7 @@ import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT; import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY; import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY; +import static org.opensearch.repositories.fs.ReloadableFsRepository.REPOSITORIES_FAILRATE_SETTING; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; public class RemoteStoreBaseIntegTestCase extends OpenSearchIntegTestCase { @@ -146,6 +149,18 @@ protected Settings nodeSettings(int nodeOrdinal) { } } + protected void setFailRate(String repoName, int value) throws ExecutionException, InterruptedException { + GetRepositoriesRequest gr = new GetRepositoriesRequest(new String[] { repoName }); + GetRepositoriesResponse res = client().admin().cluster().getRepositories(gr).get(); + RepositoryMetadata rmd = res.repositories().get(0); + Settings.Builder settings = Settings.builder() + .put("location", rmd.settings().get("location")) + .put(REPOSITORIES_FAILRATE_SETTING.getKey(), value); + assertAcked( + client().admin().cluster().preparePutRepository(repoName).setType(ReloadableFsRepository.TYPE).setSettings(settings).get() + ); + } + public Settings indexSettings() { return defaultIndexSettings(); } @@ -224,10 +239,10 @@ public static Settings buildRemoteStoreNodeAttributes( return buildRemoteStoreNodeAttributes( segmentRepoName, segmentRepoPath, - FsRepository.TYPE, + ReloadableFsRepository.TYPE, translogRepoName, translogRepoPath, - FsRepository.TYPE, + ReloadableFsRepository.TYPE, withRateLimiterAttributes ); } diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreRestoreIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreRestoreIT.java index dfa5528eafcf2..94acf2b1dbb27 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreRestoreIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreRestoreIT.java @@ -24,6 +24,7 @@ import org.opensearch.indices.IndicesService; import org.opensearch.repositories.RepositoriesService; import org.opensearch.repositories.Repository; +import org.opensearch.repositories.fs.ReloadableFsRepository; import org.opensearch.test.InternalTestCluster; import org.opensearch.test.OpenSearchIntegTestCase; @@ -479,7 +480,14 @@ public void testRateLimitedRemoteDownloads() throws Exception { settingsMap.entrySet().forEach(entry -> settings.put(entry.getKey(), entry.getValue())); settings.put("location", segmentRepoPath).put("max_remote_download_bytes_per_sec", 4, ByteSizeUnit.KB); - assertAcked(client().admin().cluster().preparePutRepository(REPOSITORY_NAME).setType("fs").setSettings(settings).get()); + assertAcked( + client().admin() + .cluster() + .preparePutRepository(REPOSITORY_NAME) + .setType(ReloadableFsRepository.TYPE) + .setSettings(settings) + .get() + ); for (RepositoriesService repositoriesService : internalCluster().getDataNodeInstances(RepositoriesService.class)) { Repository segmentRepo = repositoriesService.repository(REPOSITORY_NAME); @@ -508,7 +516,14 @@ public void testRateLimitedRemoteDownloads() throws Exception { // revert repo metadata to pass asserts on repo metadata vs. node attrs during teardown // https://github.com/opensearch-project/OpenSearch/pull/9569#discussion_r1345668700 settings.remove("max_remote_download_bytes_per_sec"); - assertAcked(client().admin().cluster().preparePutRepository(REPOSITORY_NAME).setType("fs").setSettings(settings).get()); + assertAcked( + client().admin() + .cluster() + .preparePutRepository(REPOSITORY_NAME) + .setType(ReloadableFsRepository.TYPE) + .setSettings(settings) + .get() + ); for (RepositoriesService repositoriesService : internalCluster().getDataNodeInstances(RepositoriesService.class)) { Repository segmentRepo = repositoriesService.repository(REPOSITORY_NAME); assertNull(segmentRepo.getMetadata().settings().get("max_remote_download_bytes_per_sec")); diff --git a/server/src/internalClusterTest/java/org/opensearch/snapshots/SystemRepositoryIT.java b/server/src/internalClusterTest/java/org/opensearch/snapshots/SystemRepositoryIT.java index f50fc691fb232..28b84655a2cc7 100644 --- a/server/src/internalClusterTest/java/org/opensearch/snapshots/SystemRepositoryIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/snapshots/SystemRepositoryIT.java @@ -12,7 +12,7 @@ import org.opensearch.common.settings.Settings; import org.opensearch.core.common.unit.ByteSizeValue; import org.opensearch.repositories.RepositoryException; -import org.opensearch.repositories.fs.FsRepository; +import org.opensearch.repositories.fs.ReloadableFsRepository; import org.opensearch.test.OpenSearchIntegTestCase; import org.junit.Before; @@ -53,7 +53,7 @@ public void testRestrictedSettingsCantBeUpdated() { assertEquals( e.getMessage(), "[system-repo-name] trying to modify an unmodifiable attribute type of system " - + "repository from current value [fs] to new value [mock]" + + "repository from current value [reloadable-fs] to new value [mock]" ); } @@ -65,7 +65,12 @@ public void testSystemRepositoryNonRestrictedSettingsCanBeUpdated() { final Settings.Builder repoSettings = Settings.builder().put("location", absolutePath).put("chunk_size", new ByteSizeValue(20)); assertAcked( - client.admin().cluster().preparePutRepository(systemRepoName).setType(FsRepository.TYPE).setSettings(repoSettings).get() + client.admin() + .cluster() + .preparePutRepository(systemRepoName) + .setType(ReloadableFsRepository.TYPE) + .setSettings(repoSettings) + .get() ); } } diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java index 14b915935062c..0c97d62c44a5e 100644 --- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java @@ -293,6 +293,7 @@ public void apply(Settings value, Settings current, Settings previous) { RecoverySettings.INDICES_RECOVERY_MAX_CONCURRENT_FILE_CHUNKS_SETTING, RecoverySettings.INDICES_RECOVERY_MAX_CONCURRENT_OPERATIONS_SETTING, RecoverySettings.INDICES_RECOVERY_MAX_CONCURRENT_REMOTE_STORE_STREAMS_SETTING, + RecoverySettings.INDICES_INTERNAL_REMOTE_UPLOAD_TIMEOUT, RecoverySettings.CLUSTER_REMOTE_INDEX_SEGMENT_METADATA_RETENTION_MAX_COUNT_SETTING, ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_PRIMARIES_RECOVERIES_SETTING, ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_REPLICAS_RECOVERIES_SETTING, diff --git a/server/src/main/java/org/opensearch/common/settings/FeatureFlagSettings.java b/server/src/main/java/org/opensearch/common/settings/FeatureFlagSettings.java index 44dc4161f093a..e6d7ba0c60772 100644 --- a/server/src/main/java/org/opensearch/common/settings/FeatureFlagSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/FeatureFlagSettings.java @@ -34,6 +34,7 @@ protected FeatureFlagSettings( FeatureFlags.IDENTITY_SETTING, FeatureFlags.TELEMETRY_SETTING, FeatureFlags.DATETIME_FORMATTER_CACHING_SETTING, - FeatureFlags.WRITEABLE_REMOTE_INDEX_SETTING + FeatureFlags.WRITEABLE_REMOTE_INDEX_SETTING, + FeatureFlags.DOC_ID_FUZZY_SET_SETTING ); } diff --git a/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java b/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java index 97d6a0ddf02c8..cf502fe685fcf 100644 --- a/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java @@ -229,6 +229,9 @@ public final class IndexScopedSettings extends AbstractScopedSettings { IndexMetadata.INDEX_REMOTE_SEGMENT_STORE_REPOSITORY_SETTING, IndexMetadata.INDEX_REMOTE_TRANSLOG_REPOSITORY_SETTING, + IndexSettings.INDEX_DOC_ID_FUZZY_SET_ENABLED_SETTING, + IndexSettings.INDEX_DOC_ID_FUZZY_SET_FALSE_POSITIVE_PROBABILITY_SETTING, + // Settings for concurrent segment search IndexSettings.INDEX_CONCURRENT_SEGMENT_SEARCH_SETTING, diff --git a/server/src/main/java/org/opensearch/common/util/FeatureFlags.java b/server/src/main/java/org/opensearch/common/util/FeatureFlags.java index c88a795501ca6..075dc9934e130 100644 --- a/server/src/main/java/org/opensearch/common/util/FeatureFlags.java +++ b/server/src/main/java/org/opensearch/common/util/FeatureFlags.java @@ -54,6 +54,11 @@ public class FeatureFlags { */ public static final String WRITEABLE_REMOTE_INDEX = "opensearch.experimental.feature.writeable_remote_index.enabled"; + /** + * Gates the optimization to enable bloom filters for doc id lookup. + */ + public static final String DOC_ID_FUZZY_SET = "opensearch.experimental.optimize_doc_id_lookup.fuzzy_set.enabled"; + /** * Should store the settings from opensearch.yml. */ @@ -110,4 +115,6 @@ public static boolean isEnabled(Setting featureFlag) { false, Property.NodeScope ); + + public static final Setting DOC_ID_FUZZY_SET_SETTING = Setting.boolSetting(DOC_ID_FUZZY_SET, false, Property.NodeScope); } diff --git a/server/src/main/java/org/opensearch/index/IndexSettings.java b/server/src/main/java/org/opensearch/index/IndexSettings.java index 00e765d73f77f..43a3a49418fbf 100644 --- a/server/src/main/java/org/opensearch/index/IndexSettings.java +++ b/server/src/main/java/org/opensearch/index/IndexSettings.java @@ -65,7 +65,9 @@ import java.util.function.UnaryOperator; import static org.opensearch.Version.V_2_7_0; +import static org.opensearch.common.util.FeatureFlags.DOC_ID_FUZZY_SET_SETTING; import static org.opensearch.common.util.FeatureFlags.SEARCHABLE_SNAPSHOT_EXTENDED_COMPATIBILITY; +import static org.opensearch.index.codec.fuzzy.FuzzySetParameters.DEFAULT_FALSE_POSITIVE_PROBABILITY; import static org.opensearch.index.mapper.MapperService.INDEX_MAPPING_DEPTH_LIMIT_SETTING; import static org.opensearch.index.mapper.MapperService.INDEX_MAPPING_FIELD_NAME_LENGTH_LIMIT_SETTING; import static org.opensearch.index.mapper.MapperService.INDEX_MAPPING_NESTED_DOCS_LIMIT_SETTING; @@ -658,6 +660,22 @@ public static IndexMergePolicy fromString(String text) { Property.Dynamic ); + public static final Setting INDEX_DOC_ID_FUZZY_SET_ENABLED_SETTING = Setting.boolSetting( + "index.optimize_doc_id_lookup.fuzzy_set.enabled", + false, + Property.IndexScope, + Property.Dynamic + ); + + public static final Setting INDEX_DOC_ID_FUZZY_SET_FALSE_POSITIVE_PROBABILITY_SETTING = Setting.doubleSetting( + "index.optimize_doc_id_lookup.fuzzy_set.false_positive_probability", + DEFAULT_FALSE_POSITIVE_PROBABILITY, + 0.01, + 0.50, + Property.IndexScope, + Property.Dynamic + ); + public static final TimeValue DEFAULT_REMOTE_TRANSLOG_BUFFER_INTERVAL = new TimeValue(650, TimeUnit.MILLISECONDS); public static final TimeValue MINIMUM_REMOTE_TRANSLOG_BUFFER_INTERVAL = TimeValue.ZERO; public static final Setting INDEX_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING = Setting.timeSetting( @@ -787,6 +805,16 @@ private void setRetentionLeaseMillis(final TimeValue retentionLease) { */ private volatile UnaryOperator mergeOnFlushPolicy; + /** + * Is fuzzy set enabled for doc id + */ + private volatile boolean enableFuzzySetForDocId; + + /** + * False positive probability to use while creating fuzzy set. + */ + private volatile double docIdFuzzySetFalsePositiveProbability; + /** * Returns the default search fields for this index. */ @@ -926,6 +954,13 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti * Now this sortField (IndexSort) is stored in SegmentInfo and we need to maintain backward compatibility for them. */ widenIndexSortType = IndexMetadata.SETTING_INDEX_VERSION_CREATED.get(settings).before(V_2_7_0); + + boolean isOptimizeDocIdLookupUsingFuzzySetFeatureEnabled = FeatureFlags.isEnabled(DOC_ID_FUZZY_SET_SETTING); + if (isOptimizeDocIdLookupUsingFuzzySetFeatureEnabled) { + enableFuzzySetForDocId = scopedSettings.get(INDEX_DOC_ID_FUZZY_SET_ENABLED_SETTING); + docIdFuzzySetFalsePositiveProbability = scopedSettings.get(INDEX_DOC_ID_FUZZY_SET_FALSE_POSITIVE_PROBABILITY_SETTING); + } + scopedSettings.addSettingsUpdateConsumer( TieredMergePolicyProvider.INDEX_COMPOUND_FORMAT_SETTING, tieredMergePolicyProvider::setNoCFSRatio @@ -1032,6 +1067,11 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti this::setRemoteTranslogUploadBufferInterval ); scopedSettings.addSettingsUpdateConsumer(INDEX_REMOTE_TRANSLOG_KEEP_EXTRA_GEN_SETTING, this::setRemoteTranslogKeepExtraGen); + scopedSettings.addSettingsUpdateConsumer(INDEX_DOC_ID_FUZZY_SET_ENABLED_SETTING, this::setEnableFuzzySetForDocId); + scopedSettings.addSettingsUpdateConsumer( + INDEX_DOC_ID_FUZZY_SET_FALSE_POSITIVE_PROBABILITY_SETTING, + this::setDocIdFuzzySetFalsePositiveProbability + ); } private void setSearchIdleAfter(TimeValue searchIdleAfter) { @@ -1801,4 +1841,36 @@ public void setDefaultSearchPipeline(String defaultSearchPipeline) { public boolean shouldWidenIndexSortType() { return this.widenIndexSortType; } + + public boolean isEnableFuzzySetForDocId() { + return enableFuzzySetForDocId; + } + + public void setEnableFuzzySetForDocId(boolean enableFuzzySetForDocId) { + verifyFeatureToSetDocIdFuzzySetSetting(enabled -> this.enableFuzzySetForDocId = enabled, enableFuzzySetForDocId); + } + + public double getDocIdFuzzySetFalsePositiveProbability() { + return docIdFuzzySetFalsePositiveProbability; + } + + public void setDocIdFuzzySetFalsePositiveProbability(double docIdFuzzySetFalsePositiveProbability) { + verifyFeatureToSetDocIdFuzzySetSetting( + fpp -> this.docIdFuzzySetFalsePositiveProbability = fpp, + docIdFuzzySetFalsePositiveProbability + ); + } + + private static void verifyFeatureToSetDocIdFuzzySetSetting(Consumer settingUpdater, T val) { + if (FeatureFlags.isEnabled(DOC_ID_FUZZY_SET_SETTING)) { + settingUpdater.accept(val); + } else { + throw new IllegalArgumentException( + "Fuzzy set for optimizing doc id lookup " + + "cannot be enabled with feature flag [" + + FeatureFlags.DOC_ID_FUZZY_SET + + "] set to false" + ); + } + } } diff --git a/server/src/main/java/org/opensearch/index/codec/PerFieldMappingPostingFormatCodec.java b/server/src/main/java/org/opensearch/index/codec/PerFieldMappingPostingFormatCodec.java index dc28ad2d6dc07..1ad17f121560c 100644 --- a/server/src/main/java/org/opensearch/index/codec/PerFieldMappingPostingFormatCodec.java +++ b/server/src/main/java/org/opensearch/index/codec/PerFieldMappingPostingFormatCodec.java @@ -39,10 +39,16 @@ import org.apache.lucene.codecs.lucene90.Lucene90DocValuesFormat; import org.apache.lucene.codecs.lucene99.Lucene99Codec; import org.opensearch.common.lucene.Lucene; +import org.opensearch.index.codec.fuzzy.FuzzyFilterPostingsFormat; +import org.opensearch.index.codec.fuzzy.FuzzySetFactory; +import org.opensearch.index.codec.fuzzy.FuzzySetParameters; import org.opensearch.index.mapper.CompletionFieldMapper; +import org.opensearch.index.mapper.IdFieldMapper; import org.opensearch.index.mapper.MappedFieldType; import org.opensearch.index.mapper.MapperService; +import java.util.Map; + /** * {@link PerFieldMappingPostingFormatCodec This postings format} is the default * {@link PostingsFormat} for OpenSearch. It utilizes the @@ -57,6 +63,8 @@ public class PerFieldMappingPostingFormatCodec extends Lucene99Codec { private final Logger logger; private final MapperService mapperService; private final DocValuesFormat dvFormat = new Lucene90DocValuesFormat(); + private final FuzzySetFactory fuzzySetFactory; + private PostingsFormat docIdPostingsFormat; static { assert Codec.forName(Lucene.LATEST_CODEC).getClass().isAssignableFrom(PerFieldMappingPostingFormatCodec.class) @@ -67,6 +75,12 @@ public PerFieldMappingPostingFormatCodec(Mode compressionMode, MapperService map super(compressionMode); this.mapperService = mapperService; this.logger = logger; + fuzzySetFactory = new FuzzySetFactory( + Map.of( + IdFieldMapper.NAME, + new FuzzySetParameters(() -> mapperService.getIndexSettings().getDocIdFuzzySetFalsePositiveProbability()) + ) + ); } @Override @@ -76,6 +90,11 @@ public PostingsFormat getPostingsFormatForField(String field) { logger.warn("no index mapper found for field: [{}] returning default postings format", field); } else if (fieldType instanceof CompletionFieldMapper.CompletionFieldType) { return CompletionFieldMapper.CompletionFieldType.postingsFormat(); + } else if (IdFieldMapper.NAME.equals(field) && mapperService.getIndexSettings().isEnableFuzzySetForDocId()) { + if (docIdPostingsFormat == null) { + docIdPostingsFormat = new FuzzyFilterPostingsFormat(super.getPostingsFormatForField(field), fuzzySetFactory); + } + return docIdPostingsFormat; } return super.getPostingsFormatForField(field); } diff --git a/server/src/main/java/org/opensearch/index/codec/fuzzy/AbstractFuzzySet.java b/server/src/main/java/org/opensearch/index/codec/fuzzy/AbstractFuzzySet.java new file mode 100644 index 0000000000000..09976297361fa --- /dev/null +++ b/server/src/main/java/org/opensearch/index/codec/fuzzy/AbstractFuzzySet.java @@ -0,0 +1,61 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.codec.fuzzy; + +import org.apache.lucene.util.BytesRef; +import org.opensearch.common.CheckedSupplier; +import org.opensearch.common.hash.T1ha1; + +import java.io.IOException; +import java.util.Iterator; + +/** + * Encapsulates common behaviour implementation for a fuzzy set. + */ +public abstract class AbstractFuzzySet implements FuzzySet { + + /** + * Add an item to this fuzzy set. + * @param value The value to be added + */ + protected abstract void add(BytesRef value); + + /** + * Add all items to the underlying set. + * Implementations can choose to perform this using an optimized strategy based on the type of set. + * @param valuesIteratorProvider Supplier for an iterator over All values which should be added to the set. + */ + protected void addAll(CheckedSupplier, IOException> valuesIteratorProvider) throws IOException { + Iterator values = valuesIteratorProvider.get(); + while (values.hasNext()) { + add(values.next()); + } + } + + public Result contains(BytesRef val) { + return containsHash(generateKey(val)); + } + + protected abstract Result containsHash(long hash); + + protected long generateKey(BytesRef value) { + return T1ha1.hash(value.bytes, value.offset, value.length, 0L); + } + + protected void assertAllElementsExist(CheckedSupplier, IOException> iteratorProvider) throws IOException { + Iterator iter = iteratorProvider.get(); + int cnt = 0; + while (iter.hasNext()) { + BytesRef item = iter.next(); + assert contains(item) == Result.MAYBE + : "Expected Filter to return positive response for elements added to it. Elements matched: " + cnt; + cnt++; + } + } +} diff --git a/server/src/main/java/org/opensearch/index/codec/fuzzy/BloomFilter.java b/server/src/main/java/org/opensearch/index/codec/fuzzy/BloomFilter.java new file mode 100644 index 0000000000000..b8a8352183ca8 --- /dev/null +++ b/server/src/main/java/org/opensearch/index/codec/fuzzy/BloomFilter.java @@ -0,0 +1,150 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/* + * Based on code from the Apache Lucene project (https://github.com/apache/lucene) under the Apache License, version 2.0. + * Copyright 2001-2022 The Apache Software Foundation + * Modifications (C) OpenSearch Contributors. All Rights Reserved. + */ + +package org.opensearch.index.codec.fuzzy; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.lucene.store.DataOutput; +import org.apache.lucene.store.IndexInput; +import org.apache.lucene.util.BytesRef; +import org.apache.lucene.util.RamUsageEstimator; +import org.opensearch.common.CheckedSupplier; +import org.opensearch.common.util.io.IOUtils; +import org.opensearch.core.Assertions; + +import java.io.IOException; +import java.util.Iterator; + +/** + * The code is based on Lucene's implementation of Bloom Filter. + * It represents a subset of the Lucene implementation needed for OpenSearch use cases. + * Since the Lucene implementation is marked experimental, + * this aims to ensure we can provide a bwc implementation during upgrades. + */ +public class BloomFilter extends AbstractFuzzySet { + + private static final Logger logger = LogManager.getLogger(BloomFilter.class); + + // The sizes of BitSet used are all numbers that, when expressed in binary form, + // are all ones. This is to enable fast downsizing from one bitset to another + // by simply ANDing each set index in one bitset with the size of the target bitset + // - this provides a fast modulo of the number. Values previously accumulated in + // a large bitset and then mapped to a smaller set can be looked up using a single + // AND operation of the query term's hash rather than needing to perform a 2-step + // translation of the query term that mirrors the stored content's reprojections. + static final int[] usableBitSetSizes; + + static { + usableBitSetSizes = new int[26]; + for (int i = 0; i < usableBitSetSizes.length; i++) { + usableBitSetSizes[i] = (1 << (i + 6)) - 1; + } + } + + private final LongArrayBackedBitSet bitset; + private final int setSize; + private final int hashCount; + + BloomFilter(long maxDocs, double maxFpp, CheckedSupplier, IOException> fieldIteratorProvider) throws IOException { + int setSize = (int) Math.ceil((maxDocs * Math.log(maxFpp)) / Math.log(1 / Math.pow(2, Math.log(2)))); + setSize = getNearestSetSize(setSize < Integer.MAX_VALUE / 2 ? 2 * setSize : Integer.MAX_VALUE); + int optimalK = (int) Math.round(((double) setSize / maxDocs) * Math.log(2)); + this.bitset = new LongArrayBackedBitSet(setSize); + this.setSize = setSize; + this.hashCount = optimalK; + addAll(fieldIteratorProvider); + if (Assertions.ENABLED) { + assertAllElementsExist(fieldIteratorProvider); + } + logger.debug("Bloom filter created with fpp: {}, setSize: {}, hashCount: {}", maxFpp, setSize, hashCount); + } + + BloomFilter(IndexInput in) throws IOException { + hashCount = in.readInt(); + setSize = in.readInt(); + this.bitset = new LongArrayBackedBitSet(in); + } + + @Override + public void writeTo(DataOutput out) throws IOException { + out.writeInt(hashCount); + out.writeInt(setSize); + bitset.writeTo(out); + } + + private static int getNearestSetSize(int maxNumberOfBits) { + assert maxNumberOfBits > 0 : "Provided size estimate for bloom filter is illegal (<=0) : " + maxNumberOfBits; + int result = usableBitSetSizes[0]; + for (int i = 0; i < usableBitSetSizes.length; i++) { + if (usableBitSetSizes[i] <= maxNumberOfBits) { + result = usableBitSetSizes[i]; + } + } + return result; + } + + @Override + public SetType setType() { + return SetType.BLOOM_FILTER_V1; + } + + @Override + public Result containsHash(long hash) { + int msb = (int) (hash >>> Integer.SIZE); + int lsb = (int) hash; + for (int i = 0; i < hashCount; i++) { + int bloomPos = (lsb + i * msb); + if (!mayContainValue(bloomPos)) { + return Result.NO; + } + } + return Result.MAYBE; + } + + protected void add(BytesRef value) { + long hash = generateKey(value); + int msb = (int) (hash >>> Integer.SIZE); + int lsb = (int) hash; + for (int i = 0; i < hashCount; i++) { + // Bitmasking using bloomSize is effectively a modulo operation since set sizes are always power of 2 + int bloomPos = (lsb + i * msb) & setSize; + bitset.set(bloomPos); + } + } + + @Override + public boolean isSaturated() { + long numBitsSet = bitset.cardinality(); + // Don't bother saving bitsets if >90% of bits are set - we don't want to + // throw any more memory at this problem. + return (float) numBitsSet / (float) setSize > 0.9f; + } + + @Override + public long ramBytesUsed() { + return RamUsageEstimator.sizeOf(bitset.ramBytesUsed()); + } + + private boolean mayContainValue(int aHash) { + // Bloom sizes are always base 2 and so can be ANDed for a fast modulo + int pos = aHash & setSize; + return bitset.get(pos); + } + + @Override + public void close() throws IOException { + IOUtils.close(bitset); + } +} diff --git a/server/src/main/java/org/opensearch/index/codec/fuzzy/FuzzyFilterPostingsFormat.java b/server/src/main/java/org/opensearch/index/codec/fuzzy/FuzzyFilterPostingsFormat.java new file mode 100644 index 0000000000000..01f8054fc91be --- /dev/null +++ b/server/src/main/java/org/opensearch/index/codec/fuzzy/FuzzyFilterPostingsFormat.java @@ -0,0 +1,492 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/* + * Based on code from the Apache Lucene project (https://github.com/apache/lucene) under the Apache License, version 2.0. + * Copyright 2001-2022 The Apache Software Foundation + * Modifications (C) OpenSearch Contributors. All Rights Reserved. + */ + +package org.opensearch.index.codec.fuzzy; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.lucene.codecs.CodecUtil; +import org.apache.lucene.codecs.FieldsConsumer; +import org.apache.lucene.codecs.FieldsProducer; +import org.apache.lucene.codecs.NormsProducer; +import org.apache.lucene.codecs.PostingsFormat; +import org.apache.lucene.index.BaseTermsEnum; +import org.apache.lucene.index.FieldInfo; +import org.apache.lucene.index.Fields; +import org.apache.lucene.index.ImpactsEnum; +import org.apache.lucene.index.IndexFileNames; +import org.apache.lucene.index.PostingsEnum; +import org.apache.lucene.index.SegmentReadState; +import org.apache.lucene.index.SegmentWriteState; +import org.apache.lucene.index.Terms; +import org.apache.lucene.index.TermsEnum; +import org.apache.lucene.store.IndexInput; +import org.apache.lucene.store.IndexOutput; +import org.apache.lucene.util.BytesRef; +import org.apache.lucene.util.automaton.CompiledAutomaton; +import org.opensearch.common.util.io.IOUtils; + +import java.io.Closeable; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +/** + * Based on Lucene's BloomFilterPostingsFormat. + * Discussion with Lucene community based on which the decision to have this in OpenSearch code was taken + * is captured here: https://github.com/apache/lucene/issues/12986 + * + * The class deals with persisting the bloom filter through the postings format, + * and reading the field via a bloom filter fronted terms enum (to reduce disk seeks in case of absence of requested values) + * The class should be handled during lucene upgrades. There are bwc tests present to verify the format continues to work after upgrade. + */ + +public final class FuzzyFilterPostingsFormat extends PostingsFormat { + + private static final Logger logger = LogManager.getLogger(FuzzyFilterPostingsFormat.class); + + /** + * This name is stored in headers. If changing the implementation for the format, this name/version should be updated + * so that reads can work as expected. + */ + public static final String FUZZY_FILTER_CODEC_NAME = "FuzzyFilterCodec99"; + + public static final int VERSION_START = 0; + public static final int VERSION_CURRENT = VERSION_START; + + /** Extension of Fuzzy Filters file */ + public static final String FUZZY_FILTER_FILE_EXTENSION = "fzd"; + + private final PostingsFormat delegatePostingsFormat; + private final FuzzySetFactory fuzzySetFactory; + + public FuzzyFilterPostingsFormat(PostingsFormat delegatePostingsFormat, FuzzySetFactory fuzzySetFactory) { + super(FUZZY_FILTER_CODEC_NAME); + this.delegatePostingsFormat = delegatePostingsFormat; + this.fuzzySetFactory = fuzzySetFactory; + } + + // Needed for SPI + public FuzzyFilterPostingsFormat() { + this(null, null); + } + + @Override + public FieldsConsumer fieldsConsumer(SegmentWriteState state) throws IOException { + if (delegatePostingsFormat == null) { + throw new UnsupportedOperationException( + "Error - " + getClass().getName() + " has been constructed without a choice of PostingsFormat" + ); + } + FieldsConsumer fieldsConsumer = delegatePostingsFormat.fieldsConsumer(state); + return new FuzzyFilteredFieldsConsumer(fieldsConsumer, state); + } + + @Override + public FieldsProducer fieldsProducer(SegmentReadState state) throws IOException { + return new FuzzyFilteredFieldsProducer(state); + } + + static class FuzzyFilteredFieldsProducer extends FieldsProducer { + private FieldsProducer delegateFieldsProducer; + HashMap fuzzySetsByFieldName = new HashMap<>(); + private List closeables = new ArrayList<>(); + + public FuzzyFilteredFieldsProducer(SegmentReadState state) throws IOException { + String fuzzyFilterFileName = IndexFileNames.segmentFileName( + state.segmentInfo.name, + state.segmentSuffix, + FUZZY_FILTER_FILE_EXTENSION + ); + IndexInput filterIn = null; + boolean success = false; + try { + // Using IndexInput directly instead of ChecksumIndexInput since we want to support RandomAccessInput + filterIn = state.directory.openInput(fuzzyFilterFileName, state.context); + + CodecUtil.checkIndexHeader( + filterIn, + FUZZY_FILTER_CODEC_NAME, + VERSION_START, + VERSION_CURRENT, + state.segmentInfo.getId(), + state.segmentSuffix + ); + // Load the delegate postings format + PostingsFormat delegatePostingsFormat = PostingsFormat.forName(filterIn.readString()); + this.delegateFieldsProducer = delegatePostingsFormat.fieldsProducer(state); + int numFilters = filterIn.readInt(); + for (int i = 0; i < numFilters; i++) { + int fieldNum = filterIn.readInt(); + FuzzySet set = FuzzySetFactory.deserializeFuzzySet(filterIn); + closeables.add(set); + FieldInfo fieldInfo = state.fieldInfos.fieldInfo(fieldNum); + fuzzySetsByFieldName.put(fieldInfo.name, set); + } + CodecUtil.retrieveChecksum(filterIn); + + // Can we disable it if we foresee performance issues? + CodecUtil.checksumEntireFile(filterIn); + success = true; + closeables.add(filterIn); + } finally { + if (!success) { + IOUtils.closeWhileHandlingException(filterIn, delegateFieldsProducer); + } + } + } + + @Override + public Iterator iterator() { + return delegateFieldsProducer.iterator(); + } + + @Override + public void close() throws IOException { + // Why closing here? + IOUtils.closeWhileHandlingException(closeables); + delegateFieldsProducer.close(); + } + + @Override + public Terms terms(String field) throws IOException { + FuzzySet filter = fuzzySetsByFieldName.get(field); + if (filter == null) { + return delegateFieldsProducer.terms(field); + } else { + Terms result = delegateFieldsProducer.terms(field); + if (result == null) { + return null; + } + return new FuzzyFilteredTerms(result, filter); + } + } + + @Override + public int size() { + return delegateFieldsProducer.size(); + } + + static class FuzzyFilteredTerms extends Terms { + private Terms delegateTerms; + private FuzzySet filter; + + public FuzzyFilteredTerms(Terms terms, FuzzySet filter) { + this.delegateTerms = terms; + this.filter = filter; + } + + @Override + public TermsEnum intersect(CompiledAutomaton compiled, final BytesRef startTerm) throws IOException { + return delegateTerms.intersect(compiled, startTerm); + } + + @Override + public TermsEnum iterator() throws IOException { + return new FilterAppliedTermsEnum(delegateTerms, filter); + } + + @Override + public long size() throws IOException { + return delegateTerms.size(); + } + + @Override + public long getSumTotalTermFreq() throws IOException { + return delegateTerms.getSumTotalTermFreq(); + } + + @Override + public long getSumDocFreq() throws IOException { + return delegateTerms.getSumDocFreq(); + } + + @Override + public int getDocCount() throws IOException { + return delegateTerms.getDocCount(); + } + + @Override + public boolean hasFreqs() { + return delegateTerms.hasFreqs(); + } + + @Override + public boolean hasOffsets() { + return delegateTerms.hasOffsets(); + } + + @Override + public boolean hasPositions() { + return delegateTerms.hasPositions(); + } + + @Override + public boolean hasPayloads() { + return delegateTerms.hasPayloads(); + } + + @Override + public BytesRef getMin() throws IOException { + return delegateTerms.getMin(); + } + + @Override + public BytesRef getMax() throws IOException { + return delegateTerms.getMax(); + } + } + + static final class FilterAppliedTermsEnum extends BaseTermsEnum { + + private Terms delegateTerms; + private TermsEnum delegateTermsEnum; + private final FuzzySet filter; + + public FilterAppliedTermsEnum(Terms delegateTerms, FuzzySet filter) throws IOException { + this.delegateTerms = delegateTerms; + this.filter = filter; + } + + void reset(Terms delegateTerms) throws IOException { + this.delegateTerms = delegateTerms; + this.delegateTermsEnum = null; + } + + private TermsEnum delegate() throws IOException { + if (delegateTermsEnum == null) { + /* pull the iterator only if we really need it - + * this can be a relativly heavy operation depending on the + * delegate postings format and the underlying directory + * (clone IndexInput) */ + delegateTermsEnum = delegateTerms.iterator(); + } + return delegateTermsEnum; + } + + @Override + public BytesRef next() throws IOException { + return delegate().next(); + } + + @Override + public boolean seekExact(BytesRef text) throws IOException { + // The magical fail-fast speed up that is the entire point of all of + // this code - save a disk seek if there is a match on an in-memory + // structure + // that may occasionally give a false positive but guaranteed no false + // negatives + if (filter.contains(text) == FuzzySet.Result.NO) { + return false; + } + return delegate().seekExact(text); + } + + @Override + public SeekStatus seekCeil(BytesRef text) throws IOException { + return delegate().seekCeil(text); + } + + @Override + public void seekExact(long ord) throws IOException { + delegate().seekExact(ord); + } + + @Override + public BytesRef term() throws IOException { + return delegate().term(); + } + + @Override + public long ord() throws IOException { + return delegate().ord(); + } + + @Override + public int docFreq() throws IOException { + return delegate().docFreq(); + } + + @Override + public long totalTermFreq() throws IOException { + return delegate().totalTermFreq(); + } + + @Override + public PostingsEnum postings(PostingsEnum reuse, int flags) throws IOException { + return delegate().postings(reuse, flags); + } + + @Override + public ImpactsEnum impacts(int flags) throws IOException { + return delegate().impacts(flags); + } + + @Override + public String toString() { + return getClass().getSimpleName() + "(filter=" + filter.toString() + ")"; + } + } + + @Override + public void checkIntegrity() throws IOException { + delegateFieldsProducer.checkIntegrity(); + } + + @Override + public String toString() { + return getClass().getSimpleName() + "(fields=" + fuzzySetsByFieldName.size() + ",delegate=" + delegateFieldsProducer + ")"; + } + } + + class FuzzyFilteredFieldsConsumer extends FieldsConsumer { + private FieldsConsumer delegateFieldsConsumer; + private Map fuzzySets = new HashMap<>(); + private SegmentWriteState state; + private List closeables = new ArrayList<>(); + + public FuzzyFilteredFieldsConsumer(FieldsConsumer fieldsConsumer, SegmentWriteState state) { + this.delegateFieldsConsumer = fieldsConsumer; + this.state = state; + } + + @Override + public void write(Fields fields, NormsProducer norms) throws IOException { + + // Delegate must write first: it may have opened files + // on creating the class + // (e.g. Lucene41PostingsConsumer), and write() will + // close them; alternatively, if we delayed pulling + // the fields consumer until here, we could do it + // afterwards: + delegateFieldsConsumer.write(fields, norms); + + for (String field : fields) { + Terms terms = fields.terms(field); + if (terms == null) { + continue; + } + FieldInfo fieldInfo = state.fieldInfos.fieldInfo(field); + FuzzySet fuzzySet = fuzzySetFactory.createFuzzySet(state.segmentInfo.maxDoc(), fieldInfo.name, () -> iterator(terms)); + if (fuzzySet == null) { + break; + } + assert fuzzySets.containsKey(fieldInfo) == false; + closeables.add(fuzzySet); + fuzzySets.put(fieldInfo, fuzzySet); + } + } + + private Iterator iterator(Terms terms) throws IOException { + TermsEnum termIterator = terms.iterator(); + return new Iterator<>() { + + private BytesRef currentTerm; + private PostingsEnum postingsEnum; + + @Override + public boolean hasNext() { + try { + do { + currentTerm = termIterator.next(); + if (currentTerm == null) { + return false; + } + postingsEnum = termIterator.postings(postingsEnum, 0); + if (postingsEnum.nextDoc() != PostingsEnum.NO_MORE_DOCS) { + return true; + } + } while (true); + } catch (IOException ex) { + throw new IllegalStateException("Cannot read terms: " + termIterator.attributes()); + } + } + + @Override + public BytesRef next() { + return currentTerm; + } + }; + } + + private boolean closed; + + @Override + public void close() throws IOException { + if (closed) { + return; + } + closed = true; + delegateFieldsConsumer.close(); + + // Now we are done accumulating values for these fields + List> nonSaturatedSets = new ArrayList<>(); + + for (Map.Entry entry : fuzzySets.entrySet()) { + FuzzySet fuzzySet = entry.getValue(); + if (!fuzzySet.isSaturated()) { + nonSaturatedSets.add(entry); + } + } + String fuzzyFilterFileName = IndexFileNames.segmentFileName( + state.segmentInfo.name, + state.segmentSuffix, + FUZZY_FILTER_FILE_EXTENSION + ); + try (IndexOutput fuzzyFilterFileOutput = state.directory.createOutput(fuzzyFilterFileName, state.context)) { + logger.trace( + "Writing fuzzy filter postings with version: {} for segment: {}", + VERSION_CURRENT, + state.segmentInfo.toString() + ); + CodecUtil.writeIndexHeader( + fuzzyFilterFileOutput, + FUZZY_FILTER_CODEC_NAME, + VERSION_CURRENT, + state.segmentInfo.getId(), + state.segmentSuffix + ); + + // remember the name of the postings format we will delegate to + fuzzyFilterFileOutput.writeString(delegatePostingsFormat.getName()); + + // First field in the output file is the number of fields+sets saved + fuzzyFilterFileOutput.writeInt(nonSaturatedSets.size()); + for (Map.Entry entry : nonSaturatedSets) { + FieldInfo fieldInfo = entry.getKey(); + FuzzySet fuzzySet = entry.getValue(); + saveAppropriatelySizedFuzzySet(fuzzyFilterFileOutput, fuzzySet, fieldInfo); + } + CodecUtil.writeFooter(fuzzyFilterFileOutput); + } + // We are done with large bitsets so no need to keep them hanging around + fuzzySets.clear(); + IOUtils.closeWhileHandlingException(closeables); + } + + private void saveAppropriatelySizedFuzzySet(IndexOutput fileOutput, FuzzySet fuzzySet, FieldInfo fieldInfo) throws IOException { + fileOutput.writeInt(fieldInfo.number); + fileOutput.writeString(fuzzySet.setType().getSetName()); + fuzzySet.writeTo(fileOutput); + } + } + + @Override + public String toString() { + return "FuzzyFilterPostingsFormat(" + delegatePostingsFormat + ")"; + } +} diff --git a/server/src/main/java/org/opensearch/index/codec/fuzzy/FuzzySet.java b/server/src/main/java/org/opensearch/index/codec/fuzzy/FuzzySet.java new file mode 100644 index 0000000000000..df443ffbca33d --- /dev/null +++ b/server/src/main/java/org/opensearch/index/codec/fuzzy/FuzzySet.java @@ -0,0 +1,98 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.codec.fuzzy; + +import org.apache.lucene.store.DataOutput; +import org.apache.lucene.store.IndexInput; +import org.apache.lucene.util.Accountable; +import org.apache.lucene.util.BytesRef; +import org.opensearch.common.CheckedFunction; + +import java.io.Closeable; +import java.io.IOException; +import java.util.List; + +/** + * Fuzzy Filter interface + */ +public interface FuzzySet extends Accountable, Closeable { + + /** + * Name used for a codec to be aware of what fuzzy set has been used. + */ + SetType setType(); + + /** + * @param value the item whose membership needs to be checked. + */ + Result contains(BytesRef value); + + boolean isSaturated(); + + void writeTo(DataOutput out) throws IOException; + + /** + * Enum to represent result of membership check on a fuzzy set. + */ + enum Result { + /** + * A definite no for the set membership of an item. + */ + NO, + + /** + * Fuzzy sets cannot guarantee that a given item is present in the set or not due the data being stored in + * a lossy format (e.g. fingerprint, hash). + * Hence, we return a response denoting that the item maybe present. + */ + MAYBE + } + + /** + * Enum to declare supported properties and mappings for a fuzzy set implementation. + */ + enum SetType { + BLOOM_FILTER_V1("bloom_filter_v1", BloomFilter::new, List.of("bloom_filter")); + + /** + * Name persisted in postings file. This will be used when reading to determine the bloom filter implementation. + */ + private final String setName; + + /** + * Interface for reading the actual fuzzy set implementation into java object. + */ + private final CheckedFunction deserializer; + + SetType(String setName, CheckedFunction deserializer, List aliases) { + if (aliases.size() < 1) { + throw new IllegalArgumentException("Alias list is empty. Could not create Set Type: " + setName); + } + this.setName = setName; + this.deserializer = deserializer; + } + + public String getSetName() { + return setName; + } + + public CheckedFunction getDeserializer() { + return deserializer; + } + + public static SetType from(String name) { + for (SetType type : SetType.values()) { + if (type.setName.equals(name)) { + return type; + } + } + throw new IllegalArgumentException("There is no implementation for fuzzy set: " + name); + } + } +} diff --git a/server/src/main/java/org/opensearch/index/codec/fuzzy/FuzzySetFactory.java b/server/src/main/java/org/opensearch/index/codec/fuzzy/FuzzySetFactory.java new file mode 100644 index 0000000000000..5d1fd03f099d4 --- /dev/null +++ b/server/src/main/java/org/opensearch/index/codec/fuzzy/FuzzySetFactory.java @@ -0,0 +1,49 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.codec.fuzzy; + +import org.apache.lucene.store.IndexInput; +import org.apache.lucene.util.BytesRef; +import org.opensearch.common.CheckedSupplier; + +import java.io.IOException; +import java.util.Iterator; +import java.util.Map; + +/** + * Factory class to create fuzzy set. + * Supports bloom filters for now. More sets can be added as required. + */ +public class FuzzySetFactory { + + private final Map setTypeForField; + + public FuzzySetFactory(Map setTypeForField) { + this.setTypeForField = setTypeForField; + } + + public FuzzySet createFuzzySet(int maxDocs, String fieldName, CheckedSupplier, IOException> iteratorProvider) + throws IOException { + FuzzySetParameters params = setTypeForField.get(fieldName); + if (params == null) { + throw new IllegalArgumentException("No fuzzy set defined for field: " + fieldName); + } + switch (params.getSetType()) { + case BLOOM_FILTER_V1: + return new BloomFilter(maxDocs, params.getFalsePositiveProbability(), iteratorProvider); + default: + throw new IllegalArgumentException("No Implementation for set type: " + params.getSetType()); + } + } + + public static FuzzySet deserializeFuzzySet(IndexInput in) throws IOException { + FuzzySet.SetType setType = FuzzySet.SetType.from(in.readString()); + return setType.getDeserializer().apply(in); + } +} diff --git a/server/src/main/java/org/opensearch/index/codec/fuzzy/FuzzySetParameters.java b/server/src/main/java/org/opensearch/index/codec/fuzzy/FuzzySetParameters.java new file mode 100644 index 0000000000000..7bb96e7c34f0b --- /dev/null +++ b/server/src/main/java/org/opensearch/index/codec/fuzzy/FuzzySetParameters.java @@ -0,0 +1,34 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.codec.fuzzy; + +import java.util.function.Supplier; + +/** + * Wrapper for params to create a fuzzy set. + */ +public class FuzzySetParameters { + private final Supplier falsePositiveProbabilityProvider; + private final FuzzySet.SetType setType; + + public static final double DEFAULT_FALSE_POSITIVE_PROBABILITY = 0.2047d; + + public FuzzySetParameters(Supplier falsePositiveProbabilityProvider) { + this.falsePositiveProbabilityProvider = falsePositiveProbabilityProvider; + this.setType = FuzzySet.SetType.BLOOM_FILTER_V1; + } + + public double getFalsePositiveProbability() { + return falsePositiveProbabilityProvider.get(); + } + + public FuzzySet.SetType getSetType() { + return setType; + } +} diff --git a/server/src/main/java/org/opensearch/index/codec/fuzzy/IndexInputImmutableLongArray.java b/server/src/main/java/org/opensearch/index/codec/fuzzy/IndexInputImmutableLongArray.java new file mode 100644 index 0000000000000..08d6059c1e82e --- /dev/null +++ b/server/src/main/java/org/opensearch/index/codec/fuzzy/IndexInputImmutableLongArray.java @@ -0,0 +1,70 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.codec.fuzzy; + +import org.apache.lucene.store.RandomAccessInput; +import org.apache.lucene.util.RamUsageEstimator; +import org.opensearch.OpenSearchException; +import org.opensearch.common.util.LongArray; + +import java.io.IOException; + +/** + * A Long array backed by RandomAccessInput. + * This implementation supports read operations only. + */ +class IndexInputImmutableLongArray implements LongArray { + + private final RandomAccessInput input; + private final long size; + + IndexInputImmutableLongArray(long size, RandomAccessInput input) { + this.size = size; + this.input = input; + } + + @Override + public void close() {} + + @Override + public long size() { + return size; + } + + @Override + public synchronized long get(long index) { + try { + // Multiplying by 8 since each long is 8 bytes, and we need to get the long value at (index * 8) in the + // RandomAccessInput being accessed. + return input.readLong(index << 3); + } catch (IOException ex) { + throw new OpenSearchException(ex); + } + } + + @Override + public long set(long index, long value) { + throw new UnsupportedOperationException(); + } + + @Override + public long increment(long index, long inc) { + throw new UnsupportedOperationException(); + } + + @Override + public void fill(long fromIndex, long toIndex, long value) { + throw new UnsupportedOperationException(); + } + + @Override + public long ramBytesUsed() { + return RamUsageEstimator.shallowSizeOfInstance(IndexInputImmutableLongArray.class); + } +} diff --git a/server/src/main/java/org/opensearch/index/codec/fuzzy/LongArrayBackedBitSet.java b/server/src/main/java/org/opensearch/index/codec/fuzzy/LongArrayBackedBitSet.java new file mode 100644 index 0000000000000..bd4936aeec366 --- /dev/null +++ b/server/src/main/java/org/opensearch/index/codec/fuzzy/LongArrayBackedBitSet.java @@ -0,0 +1,105 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.codec.fuzzy; + +import org.apache.lucene.store.DataOutput; +import org.apache.lucene.store.IndexInput; +import org.apache.lucene.util.Accountable; +import org.opensearch.common.util.BigArrays; +import org.opensearch.common.util.LongArray; +import org.opensearch.common.util.io.IOUtils; + +import java.io.Closeable; +import java.io.IOException; + +/** + * A bitset backed by a long-indexed array. + */ +class LongArrayBackedBitSet implements Accountable, Closeable { + + private long underlyingArrayLength = 0L; + private LongArray longArray; + + /** + * Constructor which uses an on heap array. This should be using during construction of the bitset. + * @param capacity The maximum capacity to provision for the bitset. + */ + LongArrayBackedBitSet(long capacity) { + // Since the bitset is backed by a long array, we only need 1 element for every 64 bits in the underlying array. + underlyingArrayLength = (capacity >> 6) + 1L; + this.longArray = BigArrays.NON_RECYCLING_INSTANCE.withCircuitBreaking().newLongArray(underlyingArrayLength); + } + + /** + * Constructor which uses Lucene's IndexInput to read the bitset into a read-only buffer. + * @param in IndexInput containing the serialized bitset. + * @throws IOException + */ + LongArrayBackedBitSet(IndexInput in) throws IOException { + underlyingArrayLength = in.readLong(); + // Multiplying by 8 since the length above is of the long array, so we will have + // 8 times the number of bytes in our stream. + long streamLength = underlyingArrayLength << 3; + this.longArray = new IndexInputImmutableLongArray(underlyingArrayLength, in.randomAccessSlice(in.getFilePointer(), streamLength)); + in.skipBytes(streamLength); + } + + public void writeTo(DataOutput out) throws IOException { + out.writeLong(underlyingArrayLength); + for (int idx = 0; idx < underlyingArrayLength; idx++) { + out.writeLong(longArray.get(idx)); + } + } + + /** + * This is an O(n) operation, and will iterate over all the elements in the underlying long array + * to determine cardinality of the set. + * @return number of set bits in the bitset. + */ + public long cardinality() { + long tot = 0; + for (int i = 0; i < underlyingArrayLength; ++i) { + tot += Long.bitCount(longArray.get(i)); + } + return tot; + } + + /** + * Retrieves whether the bit is set or not at the given index. + * @param index the index to look up for the bit + * @return true if bit is set, false otherwise + */ + public boolean get(long index) { + long i = index >> 6; // div 64 + long val = longArray.get(i); + long bitmask = 1L << index; + return (val & bitmask) != 0; + } + + /** + * Sets the bit at the given index. + * @param index the index to set the bit at. + */ + public void set(long index) { + long wordNum = index >> 6; // div 64 + long bitmask = 1L << index; + long val = longArray.get(wordNum); + longArray.set(wordNum, val | bitmask); + } + + @Override + public long ramBytesUsed() { + return 128L + longArray.ramBytesUsed(); + } + + @Override + public void close() throws IOException { + IOUtils.close(longArray); + } +} diff --git a/server/src/main/java/org/opensearch/index/codec/fuzzy/package-info.java b/server/src/main/java/org/opensearch/index/codec/fuzzy/package-info.java new file mode 100644 index 0000000000000..7aeac68cd192a --- /dev/null +++ b/server/src/main/java/org/opensearch/index/codec/fuzzy/package-info.java @@ -0,0 +1,10 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** classes responsible for handling all fuzzy codecs and operations */ +package org.opensearch.index.codec.fuzzy; diff --git a/server/src/main/java/org/opensearch/index/engine/SegmentsStats.java b/server/src/main/java/org/opensearch/index/engine/SegmentsStats.java index d4a97f0267222..34aecfc62b8b2 100644 --- a/server/src/main/java/org/opensearch/index/engine/SegmentsStats.java +++ b/server/src/main/java/org/opensearch/index/engine/SegmentsStats.java @@ -41,6 +41,7 @@ import org.opensearch.core.xcontent.ToXContentFragment; import org.opensearch.core.xcontent.XContentBuilder; import org.opensearch.index.ReplicationStats; +import org.opensearch.index.codec.fuzzy.FuzzyFilterPostingsFormat; import org.opensearch.index.remote.RemoteSegmentStats; import java.io.IOException; @@ -95,7 +96,8 @@ public class SegmentsStats implements Writeable, ToXContentFragment { Map.entry("tvx", "Term Vector Index"), Map.entry("tvd", "Term Vector Documents"), Map.entry("tvf", "Term Vector Fields"), - Map.entry("liv", "Live Documents") + Map.entry("liv", "Live Documents"), + Map.entry(FuzzyFilterPostingsFormat.FUZZY_FILTER_FILE_EXTENSION, "Fuzzy Filter") ); public SegmentsStats() { diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index 5834eabfa9af0..977155a1cbb72 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -2025,23 +2025,35 @@ public RemoteSegmentStoreDirectory getRemoteDirectory() { } /** - Returns true iff it is able to verify that remote segment store - is in sync with local + * Returns true iff it is able to verify that remote segment store + * is in sync with local */ boolean isRemoteSegmentStoreInSync() { assert indexSettings.isRemoteStoreEnabled(); try { RemoteSegmentStoreDirectory directory = getRemoteDirectory(); if (directory.readLatestMetadataFile() != null) { - // verifying that all files except EXCLUDE_FILES are uploaded to the remote Collection uploadFiles = directory.getSegmentsUploadedToRemoteStore().keySet(); - SegmentInfos segmentInfos = store.readLastCommittedSegmentsInfo(); - Collection localFiles = segmentInfos.files(true); - if (uploadFiles.containsAll(localFiles)) { - return true; + try (GatedCloseable segmentInfosGatedCloseable = getSegmentInfosSnapshot()) { + Collection localSegmentInfosFiles = segmentInfosGatedCloseable.get().files(true); + Set localFiles = new HashSet<>(localSegmentInfosFiles); + // verifying that all files except EXCLUDE_FILES are uploaded to the remote + localFiles.removeAll(RemoteStoreRefreshListener.EXCLUDE_FILES); + if (uploadFiles.containsAll(localFiles)) { + return true; + } + logger.debug( + () -> new ParameterizedMessage( + "RemoteSegmentStoreSyncStatus localSize={} remoteSize={}", + localFiles.size(), + uploadFiles.size() + ) + ); } } - } catch (IOException e) { + } catch (AlreadyClosedException e) { + throw e; + } catch (Throwable e) { logger.error("Exception while reading latest metadata", e); } return false; diff --git a/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java b/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java index f75e86540ed9b..7bb80b736693f 100644 --- a/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java +++ b/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java @@ -172,13 +172,33 @@ private boolean shouldSync(boolean didRefresh, boolean skipPrimaryTermCheck) { // When the shouldSync is called the first time, then 1st condition on primary term is true. But after that // we update the primary term and the same condition would not evaluate to true again in syncSegments. // Below check ensures that if there is commit, then that gets picked up by both 1st and 2nd shouldSync call. - || isRefreshAfterCommitSafe(); + || isRefreshAfterCommitSafe() + || isRemoteSegmentStoreInSync() == false; if (shouldSync || skipPrimaryTermCheck) { return shouldSync; } return this.primaryTerm != indexShard.getOperationPrimaryTerm(); } + /** + * Checks if all files present in local store are uploaded to remote store or part of excluded files. + * + * Different from IndexShard#isRemoteSegmentStoreInSync as + * it uses files uploaded cache in RemoteDirector and it doesn't make a remote store call. + * Doesn't throw an exception on store getting closed as store will be open + * + * + * @return true iff all the local files are uploaded to remote store. + */ + boolean isRemoteSegmentStoreInSync() { + try (GatedCloseable segmentInfosGatedCloseable = indexShard.getSegmentInfosSnapshot()) { + return segmentInfosGatedCloseable.get().files(true).stream().allMatch(this::skipUpload); + } catch (Throwable throwable) { + logger.error("Throwable thrown during isRemoteSegmentStoreInSync", throwable); + } + return false; + } + /* @return false if retry is needed */ diff --git a/server/src/main/java/org/opensearch/index/shard/StoreRecovery.java b/server/src/main/java/org/opensearch/index/shard/StoreRecovery.java index 5b1940bb1d9a5..3faef2da05320 100644 --- a/server/src/main/java/org/opensearch/index/shard/StoreRecovery.java +++ b/server/src/main/java/org/opensearch/index/shard/StoreRecovery.java @@ -38,11 +38,13 @@ import org.apache.lucene.index.NoMergePolicy; import org.apache.lucene.index.SegmentInfos; import org.apache.lucene.search.Sort; +import org.apache.lucene.store.AlreadyClosedException; import org.apache.lucene.store.Directory; import org.apache.lucene.store.FilterDirectory; import org.apache.lucene.store.IOContext; import org.apache.lucene.store.IndexInput; import org.opensearch.ExceptionsHelper; +import org.opensearch.OpenSearchException; import org.opensearch.action.StepListener; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.metadata.MappingMetadata; @@ -191,7 +193,8 @@ void recoverFromLocalShards( // just trigger a merge to do housekeeping on the // copied segments - we will also see them in stats etc. indexShard.getEngine().forceMerge(false, -1, false, false, false, UUIDs.randomBase64UUID()); - if (indexShard.isRemoteTranslogEnabled()) { + if (indexShard.isRemoteTranslogEnabled() && indexShard.shardRouting.primary()) { + waitForRemoteStoreSync(indexShard); if (indexShard.isRemoteSegmentStoreInSync() == false) { throw new IndexShardRecoveryException( indexShard.shardId(), @@ -432,7 +435,8 @@ void recoverFromSnapshotAndRemoteStore( } indexShard.getEngine().fillSeqNoGaps(indexShard.getPendingPrimaryTerm()); indexShard.finalizeRecovery(); - if (indexShard.isRemoteTranslogEnabled()) { + if (indexShard.isRemoteTranslogEnabled() && indexShard.shardRouting.primary()) { + waitForRemoteStoreSync(indexShard); if (indexShard.isRemoteSegmentStoreInSync() == false) { listener.onFailure(new IndexShardRestoreFailedException(shardId, "Failed to upload to remote segment store")); return; @@ -717,7 +721,8 @@ private void restore( } indexShard.getEngine().fillSeqNoGaps(indexShard.getPendingPrimaryTerm()); indexShard.finalizeRecovery(); - if (indexShard.isRemoteTranslogEnabled()) { + if (indexShard.isRemoteTranslogEnabled() && indexShard.shardRouting.primary()) { + waitForRemoteStoreSync(indexShard); if (indexShard.isRemoteSegmentStoreInSync() == false) { listener.onFailure(new IndexShardRestoreFailedException(shardId, "Failed to upload to remote segment store")); return; @@ -791,4 +796,31 @@ private void bootstrap(final IndexShard indexShard, final Store store) throws IO ); store.associateIndexWithNewTranslog(translogUUID); } + + /* + Blocks the calling thread, waiting for the remote store to get synced till internal Remote Upload Timeout + */ + private void waitForRemoteStoreSync(IndexShard indexShard) { + if (indexShard.shardRouting.primary() == false) { + return; + } + long startNanos = System.nanoTime(); + + while (System.nanoTime() - startNanos < indexShard.getRecoverySettings().internalRemoteUploadTimeout().nanos()) { + try { + if (indexShard.isRemoteSegmentStoreInSync()) { + break; + } else { + try { + Thread.sleep(TimeValue.timeValueMinutes(1).seconds()); + } catch (InterruptedException ie) { + throw new OpenSearchException("Interrupted waiting for completion of [{}]", ie); + } + } + } catch (AlreadyClosedException e) { + // There is no point in waiting as shard is now closed . + return; + } + } + } } diff --git a/server/src/main/java/org/opensearch/indices/recovery/RecoverySettings.java b/server/src/main/java/org/opensearch/indices/recovery/RecoverySettings.java index 5351ae7fe08dd..2b41eb125d808 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/RecoverySettings.java +++ b/server/src/main/java/org/opensearch/indices/recovery/RecoverySettings.java @@ -46,6 +46,8 @@ import org.opensearch.core.common.unit.ByteSizeUnit; import org.opensearch.core.common.unit.ByteSizeValue; +import java.util.concurrent.TimeUnit; + /** * Settings for the recovery mechanism * @@ -176,6 +178,13 @@ public class RecoverySettings { Property.Dynamic ); + public static final Setting INDICES_INTERNAL_REMOTE_UPLOAD_TIMEOUT = Setting.timeSetting( + "indices.recovery.internal_remote_upload_timeout", + new TimeValue(1, TimeUnit.HOURS), + Property.Dynamic, + Property.NodeScope + ); + // choose 512KB-16B to ensure that the resulting byte[] is not a humongous allocation in G1. public static final ByteSizeValue DEFAULT_CHUNK_SIZE = new ByteSizeValue(512 * 1024 - 16, ByteSizeUnit.BYTES); @@ -193,6 +202,7 @@ public class RecoverySettings { private volatile int minRemoteSegmentMetadataFiles; private volatile ByteSizeValue chunkSize = DEFAULT_CHUNK_SIZE; + private volatile TimeValue internalRemoteUploadTimeout; public RecoverySettings(Settings settings, ClusterSettings clusterSettings) { this.retryDelayStateSync = INDICES_RECOVERY_RETRY_DELAY_STATE_SYNC_SETTING.get(settings); @@ -216,6 +226,7 @@ public RecoverySettings(Settings settings, ClusterSettings clusterSettings) { } logger.debug("using max_bytes_per_sec[{}]", maxBytesPerSec); + this.internalRemoteUploadTimeout = INDICES_INTERNAL_REMOTE_UPLOAD_TIMEOUT.get(settings); clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING, this::setMaxBytesPerSec); clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_MAX_CONCURRENT_FILE_CHUNKS_SETTING, this::setMaxConcurrentFileChunks); @@ -237,6 +248,8 @@ public RecoverySettings(Settings settings, ClusterSettings clusterSettings) { CLUSTER_REMOTE_INDEX_SEGMENT_METADATA_RETENTION_MAX_COUNT_SETTING, this::setMinRemoteSegmentMetadataFiles ); + clusterSettings.addSettingsUpdateConsumer(INDICES_INTERNAL_REMOTE_UPLOAD_TIMEOUT, this::setInternalRemoteUploadTimeout); + } public RateLimiter rateLimiter() { @@ -267,6 +280,10 @@ public TimeValue internalActionLongTimeout() { return internalActionLongTimeout; } + public TimeValue internalRemoteUploadTimeout() { + return internalRemoteUploadTimeout; + } + public ByteSizeValue getChunkSize() { return chunkSize; } @@ -298,6 +315,10 @@ public void setInternalActionLongTimeout(TimeValue internalActionLongTimeout) { this.internalActionLongTimeout = internalActionLongTimeout; } + public void setInternalRemoteUploadTimeout(TimeValue internalRemoteUploadTimeout) { + this.internalRemoteUploadTimeout = internalRemoteUploadTimeout; + } + private void setMaxBytesPerSec(ByteSizeValue maxBytesPerSec) { this.maxBytesPerSec = maxBytesPerSec; if (maxBytesPerSec.getBytes() <= 0) { diff --git a/server/src/main/java/org/opensearch/repositories/RepositoriesModule.java b/server/src/main/java/org/opensearch/repositories/RepositoriesModule.java index cc4d3c006d84c..afb6e530b0eec 100644 --- a/server/src/main/java/org/opensearch/repositories/RepositoriesModule.java +++ b/server/src/main/java/org/opensearch/repositories/RepositoriesModule.java @@ -39,6 +39,7 @@ import org.opensearch.indices.recovery.RecoverySettings; import org.opensearch.plugins.RepositoryPlugin; import org.opensearch.repositories.fs.FsRepository; +import org.opensearch.repositories.fs.ReloadableFsRepository; import org.opensearch.threadpool.ThreadPool; import org.opensearch.transport.TransportService; @@ -71,6 +72,11 @@ public RepositoriesModule( metadata -> new FsRepository(metadata, env, namedXContentRegistry, clusterService, recoverySettings) ); + factories.put( + ReloadableFsRepository.TYPE, + metadata -> new ReloadableFsRepository(metadata, env, namedXContentRegistry, clusterService, recoverySettings) + ); + for (RepositoryPlugin repoPlugin : repoPlugins) { Map newRepoTypes = repoPlugin.getRepositories( env, diff --git a/server/src/main/java/org/opensearch/repositories/fs/ReloadableFsRepository.java b/server/src/main/java/org/opensearch/repositories/fs/ReloadableFsRepository.java index c06c805a39396..e8020a432a58a 100644 --- a/server/src/main/java/org/opensearch/repositories/fs/ReloadableFsRepository.java +++ b/server/src/main/java/org/opensearch/repositories/fs/ReloadableFsRepository.java @@ -8,18 +8,52 @@ package org.opensearch.repositories.fs; +import org.opensearch.OpenSearchException; import org.opensearch.cluster.metadata.RepositoryMetadata; import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.Randomness; +import org.opensearch.common.blobstore.BlobContainer; +import org.opensearch.common.blobstore.BlobPath; +import org.opensearch.common.blobstore.BlobStore; +import org.opensearch.common.blobstore.fs.FsBlobContainer; +import org.opensearch.common.blobstore.fs.FsBlobStore; +import org.opensearch.common.settings.Setting; import org.opensearch.core.xcontent.NamedXContentRegistry; import org.opensearch.env.Environment; import org.opensearch.indices.recovery.RecoverySettings; +import java.io.IOException; +import java.io.InputStream; +import java.nio.file.Path; +import java.util.Random; + /** - * Extension of {@link FsRepository} that can be reloaded inplace + * Extension of {@link FsRepository} that can be reloaded inplace , supports failing operation and slowing it down * * @opensearch.internal */ public class ReloadableFsRepository extends FsRepository { + public static final String TYPE = "reloadable-fs"; + + private final FailSwitch fail; + private final SlowDownWriteSwitch slowDown; + + public static final Setting REPOSITORIES_FAILRATE_SETTING = Setting.intSetting( + "repositories.fail.rate", + 0, + 0, + 100, + Setting.Property.NodeScope + ); + + public static final Setting REPOSITORIES_SLOWDOWN_SETTING = Setting.intSetting( + "repositories.slowdown", + 0, + 0, + 100, + Setting.Property.NodeScope + ); + /** * Constructs a shared file system repository that is reloadable in-place. */ @@ -31,6 +65,11 @@ public ReloadableFsRepository( RecoverySettings recoverySettings ) { super(metadata, environment, namedXContentRegistry, clusterService, recoverySettings); + fail = new FailSwitch(); + fail.failRate(REPOSITORIES_FAILRATE_SETTING.get(metadata.settings())); + slowDown = new SlowDownWriteSwitch(); + slowDown.setSleepSeconds(REPOSITORIES_SLOWDOWN_SETTING.get(metadata.settings())); + readRepositoryMetadata(); } @Override @@ -40,12 +79,124 @@ public boolean isReloadable() { @Override public void reload(RepositoryMetadata repositoryMetadata) { - if (isReloadable() == false) { - return; - } - super.reload(repositoryMetadata); + readRepositoryMetadata(); validateLocation(); readMetadata(); } + + private void readRepositoryMetadata() { + fail.failRate(REPOSITORIES_FAILRATE_SETTING.get(metadata.settings())); + slowDown.setSleepSeconds(REPOSITORIES_SLOWDOWN_SETTING.get(metadata.settings())); + } + + protected BlobStore createBlobStore() throws Exception { + final String location = REPOSITORIES_LOCATION_SETTING.get(getMetadata().settings()); + final Path locationFile = environment.resolveRepoFile(location); + return new ThrowingBlobStore(bufferSize, locationFile, isReadOnly(), fail, slowDown); + } + + // A random integer from min-max (inclusive). + public static int randomIntBetween(int min, int max) { + Random random = Randomness.get(); + return random.nextInt(max - min + 1) + min; + } + + static class FailSwitch { + private volatile int failRate; + private volatile boolean onceFailedFailAlways = false; + + public boolean fail() { + final int rnd = randomIntBetween(1, 100); + boolean fail = rnd <= failRate; + if (fail && onceFailedFailAlways) { + failAlways(); + } + return fail; + } + + public void failAlways() { + failRate = 100; + } + + public void failRate(int rate) { + failRate = rate; + } + + public void onceFailedFailAlways() { + onceFailedFailAlways = true; + } + } + + static class SlowDownWriteSwitch { + private volatile int sleepSeconds; + + public void setSleepSeconds(int sleepSeconds) { + this.sleepSeconds = sleepSeconds; + } + + public int getSleepSeconds() { + return sleepSeconds; + } + } + + private static class ThrowingBlobStore extends FsBlobStore { + + private final FailSwitch fail; + private final SlowDownWriteSwitch slowDown; + + public ThrowingBlobStore(int bufferSizeInBytes, Path path, boolean readonly, FailSwitch fail, SlowDownWriteSwitch slowDown) + throws IOException { + super(bufferSizeInBytes, path, readonly); + this.fail = fail; + this.slowDown = slowDown; + } + + @Override + public BlobContainer blobContainer(BlobPath path) { + try { + return new ThrowingBlobContainer(this, path, buildAndCreate(path), fail, slowDown); + } catch (IOException ex) { + throw new OpenSearchException("failed to create blob container", ex); + } + } + } + + private static class ThrowingBlobContainer extends FsBlobContainer { + + private final FailSwitch fail; + private final SlowDownWriteSwitch slowDown; + + public ThrowingBlobContainer(FsBlobStore blobStore, BlobPath blobPath, Path path, FailSwitch fail, SlowDownWriteSwitch slowDown) { + super(blobStore, blobPath, path); + this.fail = fail; + this.slowDown = slowDown; + } + + @Override + public void writeBlobAtomic(final String blobName, final InputStream inputStream, final long blobSize, boolean failIfAlreadyExists) + throws IOException { + checkFailRateAndSleep(blobName); + super.writeBlobAtomic(blobName, inputStream, blobSize, failIfAlreadyExists); + } + + private void checkFailRateAndSleep(String blobName) throws IOException { + if (fail.fail() && blobName.contains(".dat") == false) { + throw new IOException("blob container throwing error"); + } + if (slowDown.getSleepSeconds() > 0) { + try { + Thread.sleep(slowDown.getSleepSeconds() * 1000L); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + } + + @Override + public void writeBlob(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) throws IOException { + checkFailRateAndSleep(blobName); + super.writeBlob(blobName, inputStream, blobSize, failIfAlreadyExists); + } + } } diff --git a/server/src/main/resources/META-INF/services/org.apache.lucene.codecs.PostingsFormat b/server/src/main/resources/META-INF/services/org.apache.lucene.codecs.PostingsFormat index 2c92f0ecd3f51..80b1d25064885 100644 --- a/server/src/main/resources/META-INF/services/org.apache.lucene.codecs.PostingsFormat +++ b/server/src/main/resources/META-INF/services/org.apache.lucene.codecs.PostingsFormat @@ -1 +1,2 @@ org.apache.lucene.search.suggest.document.Completion50PostingsFormat +org.opensearch.index.codec.fuzzy.FuzzyFilterPostingsFormat diff --git a/server/src/test/java/org/opensearch/index/codec/fuzzy/BloomFilterTests.java b/server/src/test/java/org/opensearch/index/codec/fuzzy/BloomFilterTests.java new file mode 100644 index 0000000000000..92669d5bc1d92 --- /dev/null +++ b/server/src/test/java/org/opensearch/index/codec/fuzzy/BloomFilterTests.java @@ -0,0 +1,82 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.codec.fuzzy; + +import org.apache.lucene.store.ByteArrayDataOutput; +import org.apache.lucene.util.BytesRef; +import org.opensearch.common.lucene.store.ByteArrayIndexInput; +import org.opensearch.test.OpenSearchTestCase; + +import java.io.IOException; +import java.util.Iterator; +import java.util.List; + +public class BloomFilterTests extends OpenSearchTestCase { + + public void testBloomFilterSerializationDeserialization() throws IOException { + int elementCount = randomIntBetween(1, 100); + long maxDocs = elementCount * 10L; // Keeping this high so that it ensures some bits are not set. + BloomFilter filter = new BloomFilter(maxDocs, getFpp(), () -> idIterator(elementCount)); + byte[] buffer = new byte[(int) maxDocs * 5]; + ByteArrayDataOutput out = new ByteArrayDataOutput(buffer); + + // Write in the format readable through factory + out.writeString(filter.setType().getSetName()); + filter.writeTo(out); + + FuzzySet reconstructedFilter = FuzzySetFactory.deserializeFuzzySet(new ByteArrayIndexInput("filter", buffer)); + assertEquals(FuzzySet.SetType.BLOOM_FILTER_V1, reconstructedFilter.setType()); + + Iterator idIterator = idIterator(elementCount); + while (idIterator.hasNext()) { + BytesRef element = idIterator.next(); + assertEquals(FuzzySet.Result.MAYBE, reconstructedFilter.contains(element)); + assertEquals(FuzzySet.Result.MAYBE, filter.contains(element)); + } + } + + public void testBloomFilterIsSaturated_returnsTrue() throws IOException { + BloomFilter bloomFilter = new BloomFilter(1L, getFpp(), () -> idIterator(1000)); + assertEquals(FuzzySet.SetType.BLOOM_FILTER_V1, bloomFilter.setType()); + assertEquals(true, bloomFilter.isSaturated()); + } + + public void testBloomFilterIsSaturated_returnsFalse() throws IOException { + int elementCount = randomIntBetween(1, 100); + BloomFilter bloomFilter = new BloomFilter(20000, getFpp(), () -> idIterator(elementCount)); + assertEquals(FuzzySet.SetType.BLOOM_FILTER_V1, bloomFilter.setType()); + assertEquals(false, bloomFilter.isSaturated()); + } + + public void testBloomFilterWithLargeCapacity() throws IOException { + long maxDocs = randomLongBetween(Integer.MAX_VALUE, 5L * Integer.MAX_VALUE); + BloomFilter bloomFilter = new BloomFilter(maxDocs, getFpp(), () -> List.of(new BytesRef("bar")).iterator()); + assertEquals(FuzzySet.SetType.BLOOM_FILTER_V1, bloomFilter.setType()); + } + + private double getFpp() { + return randomDoubleBetween(0.01, 0.50, true); + } + + private Iterator idIterator(int count) { + return new Iterator() { + int cnt = count; + + @Override + public boolean hasNext() { + return cnt-- > 0; + } + + @Override + public BytesRef next() { + return new BytesRef(Integer.toString(cnt)); + } + }; + } +} diff --git a/server/src/test/java/org/opensearch/index/codec/fuzzy/FuzzyFilterPostingsFormatTests.java b/server/src/test/java/org/opensearch/index/codec/fuzzy/FuzzyFilterPostingsFormatTests.java new file mode 100644 index 0000000000000..868c2175d0689 --- /dev/null +++ b/server/src/test/java/org/opensearch/index/codec/fuzzy/FuzzyFilterPostingsFormatTests.java @@ -0,0 +1,34 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.codec.fuzzy; + +import org.apache.lucene.codecs.Codec; +import org.apache.lucene.tests.index.BasePostingsFormatTestCase; +import org.apache.lucene.tests.util.TestUtil; + +import java.util.TreeMap; + +public class FuzzyFilterPostingsFormatTests extends BasePostingsFormatTestCase { + + private TreeMap params = new TreeMap<>() { + @Override + public FuzzySetParameters get(Object k) { + return new FuzzySetParameters(() -> FuzzySetParameters.DEFAULT_FALSE_POSITIVE_PROBABILITY); + } + }; + + private Codec fuzzyFilterCodec = TestUtil.alwaysPostingsFormat( + new FuzzyFilterPostingsFormat(TestUtil.getDefaultPostingsFormat(), new FuzzySetFactory(params)) + ); + + @Override + protected Codec getCodec() { + return fuzzyFilterCodec; + } +} diff --git a/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java b/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java index 6567cb03f3dc6..85878cc2e1c9d 100644 --- a/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java +++ b/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java @@ -10,6 +10,7 @@ import org.apache.lucene.index.IndexFileNames; import org.apache.lucene.index.SegmentInfos; +import org.apache.lucene.search.ReferenceManager; import org.apache.lucene.store.Directory; import org.apache.lucene.store.FilterDirectory; import org.apache.lucene.tests.store.BaseDirectoryWrapper; @@ -102,6 +103,16 @@ private void indexDocs(int startDocId, int numberOfDocs) throws IOException { public void tearDown() throws Exception { Directory storeDirectory = ((FilterDirectory) ((FilterDirectory) indexShard.store().directory()).getDelegate()).getDelegate(); ((BaseDirectoryWrapper) storeDirectory).setCheckIndexOnClose(false); + + for (ReferenceManager.RefreshListener refreshListener : indexShard.getEngine().config().getInternalRefreshListener()) { + if (refreshListener instanceof ReleasableRetryableRefreshListener) { + ((ReleasableRetryableRefreshListener) refreshListener).drainRefreshes(); + } + } + if (remoteStoreRefreshListener != null) { + remoteStoreRefreshListener.drainRefreshes(); + } + closeShards(indexShard); super.tearDown(); } @@ -335,6 +346,7 @@ public void testRefreshSuccessOnFirstAttempt() throws Exception { RemoteStoreStatsTrackerFactory trackerFactory = tuple.v2(); RemoteSegmentTransferTracker segmentTracker = trackerFactory.getRemoteSegmentTransferTracker(indexShard.shardId()); assertNoLagAndTotalUploadsFailed(segmentTracker, 0); + assertTrue("remote store in sync", tuple.v1().isRemoteSegmentStoreInSync()); } public void testRefreshSuccessOnSecondAttempt() throws Exception { @@ -404,6 +416,20 @@ public void testRefreshSuccessOnThirdAttempt() throws Exception { assertNoLagAndTotalUploadsFailed(segmentTracker, 2); } + public void testRefreshPersistentFailure() throws Exception { + int succeedOnAttempt = 10; + CountDownLatch refreshCountLatch = new CountDownLatch(1); + CountDownLatch successLatch = new CountDownLatch(10); + Tuple tuple = mockIndexShardWithRetryAndScheduleRefresh( + succeedOnAttempt, + refreshCountLatch, + successLatch + ); + // Giving 10ms for some iterations of remote refresh upload + Thread.sleep(10); + assertFalse("remote store should not in sync", tuple.v1().isRemoteSegmentStoreInSync()); + } + private void assertNoLagAndTotalUploadsFailed(RemoteSegmentTransferTracker segmentTracker, long totalUploadsFailed) throws Exception { assertBusy(() -> { assertEquals(0, segmentTracker.getBytesLag()); @@ -568,6 +594,7 @@ private Tuple mockIn // Mock indexShard.getSegmentInfosSnapshot() doAnswer(invocation -> { if (counter.incrementAndGet() <= succeedOnAttempt) { + logger.error("Failing in get segment info {}", counter.get()); throw new RuntimeException("Inducing failure in upload"); } return indexShard.getSegmentInfosSnapshot(); @@ -583,6 +610,7 @@ private Tuple mockIn doAnswer(invocation -> { if (Objects.nonNull(successLatch)) { successLatch.countDown(); + logger.info("Value fo latch {}", successLatch.getCount()); } return indexShard.getEngine(); }).when(shard).getEngine(); @@ -642,6 +670,31 @@ private void verifyUploadedSegments(RemoteSegmentStoreDirectory remoteSegmentSto } } } + assertTrue(remoteStoreRefreshListener.isRemoteSegmentStoreInSync()); + } + + public void testRemoteSegmentStoreNotInSync() throws IOException { + setup(true, 3); + remoteStoreRefreshListener.afterRefresh(true); + try (Store remoteStore = indexShard.remoteStore()) { + RemoteSegmentStoreDirectory remoteSegmentStoreDirectory = + (RemoteSegmentStoreDirectory) ((FilterDirectory) ((FilterDirectory) remoteStore.directory()).getDelegate()).getDelegate(); + verifyUploadedSegments(remoteSegmentStoreDirectory); + remoteStoreRefreshListener.isRemoteSegmentStoreInSync(); + boolean oneFileDeleted = false; + // Delete any one file from remote store + try (GatedCloseable segmentInfosGatedCloseable = indexShard.getSegmentInfosSnapshot()) { + SegmentInfos segmentInfos = segmentInfosGatedCloseable.get(); + for (String file : segmentInfos.files(true)) { + if (oneFileDeleted == false && RemoteStoreRefreshListener.EXCLUDE_FILES.contains(file) == false) { + remoteSegmentStoreDirectory.deleteFile(file); + oneFileDeleted = true; + break; + } + } + } + assertFalse(remoteStoreRefreshListener.isRemoteSegmentStoreInSync()); + } } } diff --git a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java index 412d5235fe462..bf1c4d4c94e04 100644 --- a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java @@ -785,10 +785,10 @@ protected Store createRemoteStore(Path path, ShardRouting shardRouting, IndexMet protected RemoteSegmentStoreDirectory createRemoteSegmentStoreDirectory(ShardId shardId, Path path) throws IOException { NodeEnvironment.NodePath remoteNodePath = new NodeEnvironment.NodePath(path); ShardPath remoteShardPath = new ShardPath(false, remoteNodePath.resolve(shardId), remoteNodePath.resolve(shardId), shardId); - RemoteDirectory dataDirectory = newRemoteDirectory(remoteShardPath.resolveIndex()); - RemoteDirectory metadataDirectory = newRemoteDirectory(remoteShardPath.resolveIndex()); + RemoteDirectory dataDirectory = newRemoteDirectory(remoteShardPath.resolveIndex().resolve("data")); + RemoteDirectory metadataDirectory = newRemoteDirectory(remoteShardPath.resolveIndex().resolve("metadata")); RemoteStoreLockManager remoteStoreLockManager = new RemoteStoreMetadataLockManager( - new RemoteBufferedOutputDirectory(getBlobContainer(remoteShardPath.resolveIndex())) + new RemoteBufferedOutputDirectory(getBlobContainer(remoteShardPath.resolveIndex().resolve("lock_files"))) ); return new RemoteSegmentStoreDirectory(dataDirectory, metadataDirectory, remoteStoreLockManager, threadPool, shardId); } diff --git a/test/framework/src/main/java/org/opensearch/test/OpenSearchIntegTestCase.java b/test/framework/src/main/java/org/opensearch/test/OpenSearchIntegTestCase.java index 72d68ffb9449d..0f9ab3aa3d64f 100644 --- a/test/framework/src/main/java/org/opensearch/test/OpenSearchIntegTestCase.java +++ b/test/framework/src/main/java/org/opensearch/test/OpenSearchIntegTestCase.java @@ -197,6 +197,8 @@ import static org.opensearch.core.common.util.CollectionUtils.eagerPartition; import static org.opensearch.discovery.DiscoveryModule.DISCOVERY_SEED_PROVIDERS_SETTING; import static org.opensearch.discovery.SettingsBasedSeedHostsProvider.DISCOVERY_SEED_HOSTS_SETTING; +import static org.opensearch.index.IndexSettings.INDEX_DOC_ID_FUZZY_SET_ENABLED_SETTING; +import static org.opensearch.index.IndexSettings.INDEX_DOC_ID_FUZZY_SET_FALSE_POSITIVE_PROBABILITY_SETTING; import static org.opensearch.index.IndexSettings.INDEX_SOFT_DELETES_RETENTION_LEASE_PERIOD_SETTING; import static org.opensearch.index.query.QueryBuilders.matchAllQuery; import static org.opensearch.test.XContentTestUtils.convertToMap; @@ -630,6 +632,11 @@ public Settings indexSettings() { ); } + if (randomBoolean()) { + builder.put(INDEX_DOC_ID_FUZZY_SET_ENABLED_SETTING.getKey(), true); + builder.put(INDEX_DOC_ID_FUZZY_SET_FALSE_POSITIVE_PROBABILITY_SETTING.getKey(), randomDoubleBetween(0.01, 0.50, true)); + } + return builder.build(); } @@ -646,6 +653,9 @@ protected Settings featureFlagSettings() { } // Enabling Telemetry setting by default featureSettings.put(FeatureFlags.TELEMETRY_SETTING.getKey(), true); + + // Enabling fuzzy set for tests by default + featureSettings.put(FeatureFlags.DOC_ID_FUZZY_SET_SETTING.getKey(), true); return featureSettings.build(); }