Skip to content

Commit

Permalink
Close query cache on index service creation failure (#48230)
Browse files Browse the repository at this point in the history
Today it is possible that we create the `QueryCache` and then fail to create
the owning `IndexService` and this means we do not close the `QueryCache`
again. This commit addresses that leak.

Fixes #48186
  • Loading branch information
DaveCTurner authored Oct 21, 2019
1 parent f9227da commit f9a9dcb
Show file tree
Hide file tree
Showing 3 changed files with 136 additions and 33 deletions.
41 changes: 28 additions & 13 deletions server/src/main/java/org/elasticsearch/index/IndexModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,10 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.index.analysis.AnalysisRegistry;
import org.elasticsearch.index.analysis.IndexAnalyzers;
import org.elasticsearch.index.cache.query.DisabledQueryCache;
import org.elasticsearch.index.cache.query.IndexQueryCache;
import org.elasticsearch.index.cache.query.QueryCache;
Expand Down Expand Up @@ -399,22 +401,35 @@ public IndexService newIndexService(
indexReaderWrapper.get() == null ? (shard) -> null : indexReaderWrapper.get();
eventListener.beforeIndexCreated(indexSettings.getIndex(), indexSettings.getSettings());
final IndexStorePlugin.DirectoryFactory directoryFactory = getDirectoryFactory(indexSettings, directoryFactories);
final QueryCache queryCache;
if (indexSettings.getValue(INDEX_QUERY_CACHE_ENABLED_SETTING)) {
BiFunction<IndexSettings, IndicesQueryCache, QueryCache> queryCacheProvider = forceQueryCacheProvider.get();
if (queryCacheProvider == null) {
queryCache = new IndexQueryCache(indexSettings, indicesQueryCache);
QueryCache queryCache = null;
IndexAnalyzers indexAnalyzers = null;
boolean success = false;
try {
if (indexSettings.getValue(INDEX_QUERY_CACHE_ENABLED_SETTING)) {
BiFunction<IndexSettings, IndicesQueryCache, QueryCache> queryCacheProvider = forceQueryCacheProvider.get();
if (queryCacheProvider == null) {
queryCache = new IndexQueryCache(indexSettings, indicesQueryCache);
} else {
queryCache = queryCacheProvider.apply(indexSettings, indicesQueryCache);
}
} else {
queryCache = queryCacheProvider.apply(indexSettings, indicesQueryCache);
queryCache = new DisabledQueryCache(indexSettings);
}
if (IndexService.needsMapperService(indexSettings, indexCreationContext)) {
indexAnalyzers = analysisRegistry.build(indexSettings);
}
final IndexService indexService = new IndexService(indexSettings, indexCreationContext, environment, xContentRegistry,
new SimilarityService(indexSettings, scriptService, similarities), shardStoreDeleter, indexAnalyzers,
engineFactory, circuitBreakerService, bigArrays, threadPool, scriptService, clusterService, client, queryCache,
directoryFactory, eventListener, readerWrapperFactory, mapperRegistry, indicesFieldDataCache, searchOperationListeners,
indexOperationListeners, namedWriteableRegistry);
success = true;
return indexService;
} finally {
if (success == false) {
IOUtils.closeWhileHandlingException(queryCache, indexAnalyzers);
}
} else {
queryCache = new DisabledQueryCache(indexSettings);
}
return new IndexService(indexSettings, indexCreationContext, environment, xContentRegistry,
new SimilarityService(indexSettings, scriptService, similarities),
shardStoreDeleter, analysisRegistry, engineFactory, circuitBreakerService, bigArrays, threadPool, scriptService,
clusterService, client, queryCache, directoryFactory, eventListener, readerWrapperFactory, mapperRegistry,
indicesFieldDataCache, searchOperationListeners, indexOperationListeners, namedWriteableRegistry);
}

private static IndexStorePlugin.DirectoryFactory getDirectoryFactory(
Expand Down
33 changes: 18 additions & 15 deletions server/src/main/java/org/elasticsearch/index/IndexService.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.env.ShardLock;
import org.elasticsearch.env.ShardLockObtainFailedException;
import org.elasticsearch.index.analysis.AnalysisRegistry;
import org.elasticsearch.index.analysis.IndexAnalyzers;
import org.elasticsearch.index.cache.IndexCache;
import org.elasticsearch.index.cache.bitset.BitsetFilterCache;
Expand Down Expand Up @@ -147,8 +146,7 @@ public IndexService(
NamedXContentRegistry xContentRegistry,
SimilarityService similarityService,
ShardStoreDeleter shardStoreDeleter,
AnalysisRegistry registry,
EngineFactory engineFactory,
IndexAnalyzers indexAnalyzers, EngineFactory engineFactory,
CircuitBreakerService circuitBreakerService,
BigArrays bigArrays,
ThreadPool threadPool,
Expand All @@ -163,24 +161,16 @@ public IndexService(
IndicesFieldDataCache indicesFieldDataCache,
List<SearchOperationListener> searchOperationListeners,
List<IndexingOperationListener> indexingOperationListeners,
NamedWriteableRegistry namedWriteableRegistry) throws IOException {
NamedWriteableRegistry namedWriteableRegistry) {
super(indexSettings);
this.indexSettings = indexSettings;
this.xContentRegistry = xContentRegistry;
this.similarityService = similarityService;
this.namedWriteableRegistry = namedWriteableRegistry;
this.circuitBreakerService = circuitBreakerService;
if (indexSettings.getIndexMetaData().getState() == IndexMetaData.State.CLOSE &&
indexCreationContext == IndexCreationContext.CREATE_INDEX) { // metadata verification needs a mapper service
this.mapperService = null;
this.indexFieldData = null;
this.indexSortSupplier = () -> null;
this.bitsetFilterCache = null;
this.warmer = null;
this.indexCache = null;
} else {
this.mapperService = new MapperService(indexSettings, registry.build(indexSettings), xContentRegistry, similarityService,
mapperRegistry,
if (needsMapperService(indexSettings, indexCreationContext)) {
assert indexAnalyzers != null;
this.mapperService = new MapperService(indexSettings, indexAnalyzers, xContentRegistry, similarityService, mapperRegistry,
// we parse all percolator queries as they would be parsed on shard 0
() -> newQueryShardContext(0, null, System::currentTimeMillis, null));
this.indexFieldData = new IndexFieldDataService(indexSettings, indicesFieldDataCache, circuitBreakerService, mapperService);
Expand All @@ -198,6 +188,14 @@ public IndexService(
this.bitsetFilterCache = new BitsetFilterCache(indexSettings, new BitsetCacheListener(this));
this.warmer = new IndexWarmer(threadPool, indexFieldData, bitsetFilterCache.createListener(threadPool));
this.indexCache = new IndexCache(indexSettings, queryCache, bitsetFilterCache);
} else {
assert indexAnalyzers == null;
this.mapperService = null;
this.indexFieldData = null;
this.indexSortSupplier = () -> null;
this.bitsetFilterCache = null;
this.warmer = null;
this.indexCache = null;
}

this.shardStoreDeleter = shardStoreDeleter;
Expand All @@ -222,6 +220,11 @@ public IndexService(
updateFsyncTaskIfNecessary();
}

static boolean needsMapperService(IndexSettings indexSettings, IndexCreationContext indexCreationContext) {
return false == (indexSettings.getIndexMetaData().getState() == IndexMetaData.State.CLOSE &&
indexCreationContext == IndexCreationContext.CREATE_INDEX); // metadata verification needs a mapper service
}

public enum IndexCreationContext {
CREATE_INDEX,
META_DATA_VERIFICATION
Expand Down
95 changes: 90 additions & 5 deletions server/src/test/java/org/elasticsearch/index/IndexModuleTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.elasticsearch.index;

import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.index.AssertingDirectoryReader;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.FieldInvertState;
Expand All @@ -40,12 +41,15 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.PageCacheRecycler;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.env.Environment;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.env.ShardLock;
import org.elasticsearch.env.TestEnvironment;
import org.elasticsearch.index.analysis.AnalysisRegistry;
import org.elasticsearch.index.analysis.AnalyzerProvider;
import org.elasticsearch.index.analysis.AnalyzerScope;
import org.elasticsearch.index.cache.query.DisabledQueryCache;
import org.elasticsearch.index.cache.query.IndexQueryCache;
import org.elasticsearch.index.cache.query.QueryCache;
Expand All @@ -65,6 +69,7 @@
import org.elasticsearch.index.store.FsDirectoryFactory;
import org.elasticsearch.indices.IndicesModule;
import org.elasticsearch.indices.IndicesQueryCache;
import org.elasticsearch.indices.analysis.AnalysisModule;
import org.elasticsearch.indices.breaker.CircuitBreakerService;
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
import org.elasticsearch.indices.cluster.IndicesClusterStateService.AllocatedIndices.IndexRemovalReason;
Expand All @@ -84,13 +89,17 @@

import java.io.IOException;
import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import static java.util.Collections.emptyMap;
import static java.util.Collections.singletonMap;
import static org.elasticsearch.index.IndexService.IndexCreationContext.CREATE_INDEX;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.hasToString;
import static org.hamcrest.Matchers.instanceOf;

Expand Down Expand Up @@ -174,7 +183,7 @@ public void testRegisterIndexStore() throws IOException {
.put(IndexModule.INDEX_STORE_TYPE_SETTING.getKey(), "foo_store")
.build();
final IndexSettings indexSettings = IndexSettingsModule.newIndexSettings(index, settings);
final Map<String, IndexStorePlugin.DirectoryFactory> indexStoreFactories = Collections.singletonMap(
final Map<String, IndexStorePlugin.DirectoryFactory> indexStoreFactories = singletonMap(
"foo_store", new FooFunction());
final IndexModule module = new IndexModule(indexSettings, emptyAnalysisRegistry, new InternalEngineFactory(), indexStoreFactories);

Expand Down Expand Up @@ -354,11 +363,19 @@ public void testForceCustomQueryCache() throws IOException {
.put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT).build();
final IndexSettings indexSettings = IndexSettingsModule.newIndexSettings("foo", settings);
IndexModule module = new IndexModule(indexSettings, emptyAnalysisRegistry, new InternalEngineFactory(), Collections.emptyMap());
module.forceQueryCacheProvider((a, b) -> new CustomQueryCache());
expectThrows(AlreadySetException.class, () -> module.forceQueryCacheProvider((a, b) -> new CustomQueryCache()));
final Set<CustomQueryCache> liveQueryCaches = new HashSet<>();
module.forceQueryCacheProvider((a, b) -> {
final CustomQueryCache customQueryCache = new CustomQueryCache(liveQueryCaches);
liveQueryCaches.add(customQueryCache);
return customQueryCache;
});
expectThrows(AlreadySetException.class, () -> module.forceQueryCacheProvider((a, b) -> {
throw new AssertionError("never called");
}));
IndexService indexService = newIndexService(module);
assertTrue(indexService.cache().query() instanceof CustomQueryCache);
indexService.close("simon says", false);
assertThat(liveQueryCaches, empty());
}

public void testDefaultQueryCacheImplIsSelected() throws IOException {
Expand All @@ -379,12 +396,73 @@ public void testDisableQueryCacheHasPrecedenceOverForceQueryCache() throws IOExc
.put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT).build();
final IndexSettings indexSettings = IndexSettingsModule.newIndexSettings("foo", settings);
IndexModule module = new IndexModule(indexSettings, emptyAnalysisRegistry, new InternalEngineFactory(), Collections.emptyMap());
module.forceQueryCacheProvider((a, b) -> new CustomQueryCache());
module.forceQueryCacheProvider((a, b) -> new CustomQueryCache(null));
IndexService indexService = newIndexService(module);
assertTrue(indexService.cache().query() instanceof DisabledQueryCache);
indexService.close("simon says", false);
}

public void testCustomQueryCacheCleanedUpIfIndexServiceCreationFails() {
Settings settings = Settings.builder()
.put(Environment.PATH_HOME_SETTING.getKey(), createTempDir().toString())
.put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT).build();
final IndexSettings indexSettings = IndexSettingsModule.newIndexSettings("foo", settings);
IndexModule module = new IndexModule(indexSettings, emptyAnalysisRegistry, new InternalEngineFactory(), Collections.emptyMap());
final Set<CustomQueryCache> liveQueryCaches = new HashSet<>();
module.forceQueryCacheProvider((a, b) -> {
final CustomQueryCache customQueryCache = new CustomQueryCache(liveQueryCaches);
liveQueryCaches.add(customQueryCache);
return customQueryCache;
});
threadPool.shutdown(); // causes index service creation to fail
expectThrows(EsRejectedExecutionException.class, () -> newIndexService(module));
assertThat(liveQueryCaches, empty());
}

public void testIndexAnalyzersCleanedUpIfIndexServiceCreationFails() {
Settings settings = Settings.builder()
.put(Environment.PATH_HOME_SETTING.getKey(), createTempDir().toString())
.put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT).build();
final IndexSettings indexSettings = IndexSettingsModule.newIndexSettings("foo", settings);

final HashSet<Analyzer> openAnalyzers = new HashSet<>();
final AnalysisModule.AnalysisProvider<AnalyzerProvider<?>> analysisProvider = (i,e,n,s) -> new AnalyzerProvider<>() {
@Override
public String name() {
return "test";
}

@Override
public AnalyzerScope scope() {
return AnalyzerScope.INDEX;
}

@Override
public Analyzer get() {
final Analyzer analyzer = new Analyzer() {
@Override
protected TokenStreamComponents createComponents(String fieldName) {
throw new AssertionError("should not be here");
}

@Override
public void close() {
super.close();
openAnalyzers.remove(this);
}
};
openAnalyzers.add(analyzer);
return analyzer;
}
};
final AnalysisRegistry analysisRegistry = new AnalysisRegistry(environment, emptyMap(), emptyMap(), emptyMap(),
singletonMap("test", analysisProvider), emptyMap(), emptyMap(), emptyMap(), emptyMap(), emptyMap());
IndexModule module = new IndexModule(indexSettings, analysisRegistry, new InternalEngineFactory(), Collections.emptyMap());
threadPool.shutdown(); // causes index service creation to fail
expectThrows(EsRejectedExecutionException.class, () -> newIndexService(module));
assertThat(openAnalyzers, empty());
}

public void testMmapNotAllowed() {
String storeType = randomFrom(IndexModule.Type.HYBRIDFS.getSettingsKey(), IndexModule.Type.MMAPFS.getSettingsKey());
final Settings settings = Settings.builder()
Expand All @@ -403,12 +481,19 @@ public void testMmapNotAllowed() {

class CustomQueryCache implements QueryCache {

private final Set<CustomQueryCache> liveQueryCaches;

CustomQueryCache(Set<CustomQueryCache> liveQueryCaches) {
this.liveQueryCaches = liveQueryCaches;
}

@Override
public void clear(String reason) {
}

@Override
public void close() throws IOException {
public void close() {
assertTrue(liveQueryCaches == null || liveQueryCaches.remove(this));
}

@Override
Expand Down

0 comments on commit f9a9dcb

Please sign in to comment.