diff --git a/lucene/CHANGES.txt b/lucene/CHANGES.txt index 888e034849ad..ade67fb39a4c 100644 --- a/lucene/CHANGES.txt +++ b/lucene/CHANGES.txt @@ -56,7 +56,9 @@ API Changes New Features --------------------- -(No changes) +* LUCENE-10422: Monitor Improvements: `Monitor` can use a custom `Directory` +implementation. `Monitor` can be created with a readonly `QueryIndex` in order to +have readonly `Monitor` instances. (Niko Usai) Improvements --------------------- diff --git a/lucene/monitor/src/java/org/apache/lucene/monitor/Monitor.java b/lucene/monitor/src/java/org/apache/lucene/monitor/Monitor.java index 2faf05d25ee9..40a4be51b3ff 100644 --- a/lucene/monitor/src/java/org/apache/lucene/monitor/Monitor.java +++ b/lucene/monitor/src/java/org/apache/lucene/monitor/Monitor.java @@ -26,8 +26,6 @@ import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; import org.apache.lucene.analysis.Analyzer; import org.apache.lucene.document.Document; import org.apache.lucene.index.LeafReader; @@ -38,7 +36,6 @@ import org.apache.lucene.search.ScoreMode; import org.apache.lucene.search.Scorer; import org.apache.lucene.search.Weight; -import org.apache.lucene.util.NamedThreadFactory; /** * A Monitor contains a set of {@link Query} objects with associated IDs, and efficiently matches @@ -51,14 +48,8 @@ public class Monitor implements Closeable { private final QueryIndex queryIndex; - private final List listeners = new ArrayList<>(); - private final long commitBatchSize; - private final ScheduledExecutorService purgeExecutor; - - private long lastPurged = -1; - /** * Create a non-persistent Monitor instance with the default term-filtering Presearcher * @@ -100,22 +91,11 @@ public Monitor(Analyzer analyzer, Presearcher presearcher, MonitorConfiguration this.analyzer = analyzer; this.presearcher = presearcher; - this.queryIndex = new QueryIndex(configuration, presearcher); - - long purgeFrequency = configuration.getPurgeFrequency(); - this.purgeExecutor = - Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("cache-purge")); - this.purgeExecutor.scheduleAtFixedRate( - () -> { - try { - purgeCache(); - } catch (Throwable e) { - listeners.forEach(l -> l.onPurgeError(e)); - } - }, - purgeFrequency, - purgeFrequency, - configuration.getPurgeFrequencyUnits()); + if (configuration.isReadOnly()) { + this.queryIndex = new ReadonlyQueryIndex(configuration); + } else { + this.queryIndex = new WritableQueryIndex(configuration, presearcher); + } this.commitBatchSize = configuration.getQueryUpdateBufferSize(); } @@ -127,12 +107,13 @@ public Monitor(Analyzer analyzer, Presearcher presearcher, MonitorConfiguration * @param listener listener to register */ public void addQueryIndexUpdateListener(MonitorUpdateListener listener) { - listeners.add(listener); + queryIndex.addListener(listener); } /** @return Statistics for the internal query index and cache */ - public QueryCacheStats getQueryCacheStats() { - return new QueryCacheStats(queryIndex.numDocs(), queryIndex.cacheSize(), lastPurged); + public QueryCacheStats getQueryCacheStats() throws IOException { + return new QueryCacheStats( + queryIndex.numDocs(), queryIndex.cacheSize(), queryIndex.getLastPurged()); } /** Statistics for the query cache and query index */ @@ -159,17 +140,17 @@ public QueryCacheStats(int queries, int cachedQueries, long lastPurged) { * *

This is normally called from a background thread at a rate set by configurePurgeFrequency(). * + *

When Monitor is in read-only mode, cache is NEVER purged automatically you MUST call it when + * you want new changes. + * * @throws IOException on IO errors */ public void purgeCache() throws IOException { queryIndex.purgeCache(); - lastPurged = System.nanoTime(); - listeners.forEach(MonitorUpdateListener::onPurge); } @Override public void close() throws IOException { - purgeExecutor.shutdown(); queryIndex.close(); } @@ -192,7 +173,6 @@ public void register(Iterable queries) throws IOException { private void commit(List updates) throws IOException { queryIndex.commit(updates); - listeners.forEach(l -> l.afterUpdate(updates)); } /** @@ -213,7 +193,6 @@ public void register(MonitorQuery... queries) throws IOException { */ public void deleteById(List queryIds) throws IOException { queryIndex.deleteQueries(queryIds); - listeners.forEach(l -> l.afterDelete(queryIds)); } /** @@ -233,7 +212,6 @@ public void deleteById(String... queryIds) throws IOException { */ public void clear() throws IOException { queryIndex.clear(); - listeners.forEach(MonitorUpdateListener::afterClear); } /** @@ -287,7 +265,7 @@ public MonitorQuery getQuery(final String queryId) throws IOException { } /** @return the number of queries (after decomposition) stored in this Monitor */ - public int getDisjunctCount() { + public int getDisjunctCount() throws IOException { return queryIndex.numDocs(); } diff --git a/lucene/monitor/src/java/org/apache/lucene/monitor/MonitorConfiguration.java b/lucene/monitor/src/java/org/apache/lucene/monitor/MonitorConfiguration.java index 8a5735ca4e7e..beff7b4584c7 100644 --- a/lucene/monitor/src/java/org/apache/lucene/monitor/MonitorConfiguration.java +++ b/lucene/monitor/src/java/org/apache/lucene/monitor/MonitorConfiguration.java @@ -27,6 +27,7 @@ import org.apache.lucene.store.ByteBuffersDirectory; import org.apache.lucene.store.Directory; import org.apache.lucene.store.FSDirectory; +import org.apache.lucene.util.IOSupplier; /** Encapsulates various configuration settings for a Monitor's query index */ public class MonitorConfiguration { @@ -35,8 +36,9 @@ public class MonitorConfiguration { private long purgeFrequency = 5; private TimeUnit purgeFrequencyUnits = TimeUnit.MINUTES; private QueryDecomposer queryDecomposer = new QueryDecomposer(); - private Path indexPath = null; private MonitorQuerySerializer serializer; + private boolean readOnly = false; + private IOSupplier directoryProvider = () -> new ByteBuffersDirectory(); private static IndexWriterConfig defaultIndexWriterConfig() { IndexWriterConfig iwc = new IndexWriterConfig(new KeywordAnalyzer()); @@ -47,16 +49,49 @@ private static IndexWriterConfig defaultIndexWriterConfig() { return iwc; } + public boolean isReadOnly() { + return readOnly; + } + + public IOSupplier getDirectoryProvider() { + return directoryProvider; + } + + /** + * Sets a custom directory, with a custom serializer. + * + *

You have also the chance to configure the Monitor as read-only. + * + * @param directoryProvider lambda to provide the index Directory implementation + * @param serializer the serializer used to store the queries + * @param readOnly set the monitor as read-only + * @return MonitorCOnfiguration + */ + public MonitorConfiguration setDirectoryProvider( + IOSupplier directoryProvider, + MonitorQuerySerializer serializer, + Boolean readOnly) { + this.directoryProvider = directoryProvider; + this.serializer = serializer; + this.readOnly = readOnly; + return this; + } + + public MonitorConfiguration setDirectoryProvider( + IOSupplier directoryProvider, MonitorQuerySerializer serializer) { + this.directoryProvider = directoryProvider; + this.serializer = serializer; + return this; + } + public MonitorConfiguration setIndexPath(Path indexPath, MonitorQuerySerializer serializer) { - this.indexPath = indexPath; this.serializer = serializer; + this.directoryProvider = () -> FSDirectory.open(indexPath); return this; } public IndexWriter buildIndexWriter() throws IOException { - Directory directory = - indexPath == null ? new ByteBuffersDirectory() : FSDirectory.open(indexPath); - return new IndexWriter(directory, getIndexWriterConfig()); + return new IndexWriter(directoryProvider.get(), getIndexWriterConfig()); } protected IndexWriterConfig getIndexWriterConfig() { diff --git a/lucene/monitor/src/java/org/apache/lucene/monitor/QueryIndex.java b/lucene/monitor/src/java/org/apache/lucene/monitor/QueryIndex.java index 1d55c5ee7c2b..3c40100ec080 100644 --- a/lucene/monitor/src/java/org/apache/lucene/monitor/QueryIndex.java +++ b/lucene/monitor/src/java/org/apache/lucene/monitor/QueryIndex.java @@ -19,233 +19,32 @@ import java.io.Closeable; import java.io.IOException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.locks.ReadWriteLock; -import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.*; import java.util.function.BiPredicate; -import org.apache.lucene.document.BinaryDocValuesField; -import org.apache.lucene.document.Document; -import org.apache.lucene.document.Field; -import org.apache.lucene.document.SortedDocValuesField; -import org.apache.lucene.document.StringField; -import org.apache.lucene.index.BinaryDocValues; -import org.apache.lucene.index.FieldInfo; -import org.apache.lucene.index.IndexReader; -import org.apache.lucene.index.IndexWriter; -import org.apache.lucene.index.LeafReaderContext; -import org.apache.lucene.index.SortedDocValues; -import org.apache.lucene.index.Term; -import org.apache.lucene.index.Terms; -import org.apache.lucene.index.TermsEnum; -import org.apache.lucene.search.IndexSearcher; -import org.apache.lucene.search.MatchAllDocsQuery; -import org.apache.lucene.search.Query; -import org.apache.lucene.search.Scorable; -import org.apache.lucene.search.ScoreMode; -import org.apache.lucene.search.SearcherFactory; -import org.apache.lucene.search.SearcherManager; -import org.apache.lucene.search.SimpleCollector; -import org.apache.lucene.search.TermQuery; +import org.apache.lucene.index.*; +import org.apache.lucene.search.*; import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.BytesRefHash; -import org.apache.lucene.util.IOUtils; - -class QueryIndex implements Closeable { +abstract class QueryIndex implements Closeable { static final class FIELDS { static final String query_id = "_query_id"; static final String cache_id = "_cache_id"; static final String mq = "_mq"; } - private final IndexWriter writer; - private final SearcherManager manager; - private final QueryDecomposer decomposer; - private final MonitorQuerySerializer serializer; - private final Presearcher presearcher; - - /* Used to cache updates while a purge is ongoing */ - private volatile Map purgeCache = null; - - /* Used to lock around the creation of the purgeCache */ - private final ReadWriteLock purgeLock = new ReentrantReadWriteLock(); - private final Object commitLock = new Object(); - - /* The current query cache */ - private volatile ConcurrentMap queries = new ConcurrentHashMap<>(); - // NB this is not final because it can be replaced by purgeCache() + protected SearcherManager manager; + protected QueryDecomposer decomposer; + protected MonitorQuerySerializer serializer; // package-private for testing final Map termFilters = new HashMap<>(); - QueryIndex(MonitorConfiguration config, Presearcher presearcher) throws IOException { - this.writer = config.buildIndexWriter(); - this.manager = new SearcherManager(writer, true, true, new TermsHashBuilder()); - this.decomposer = config.getQueryDecomposer(); - this.serializer = config.getQuerySerializer(); - this.presearcher = presearcher; - populateQueryCache(serializer, decomposer); - } - - private void populateQueryCache(MonitorQuerySerializer serializer, QueryDecomposer decomposer) - throws IOException { - if (serializer == null) { - // No query serialization happening here - check that the cache is empty - IndexSearcher searcher = manager.acquire(); - try { - if (searcher.count(new MatchAllDocsQuery()) != 0) { - throw new IllegalStateException( - "Attempting to open a non-empty monitor query index with no MonitorQuerySerializer"); - } - } finally { - manager.release(searcher); - } - return; - } - Set ids = new HashSet<>(); - List errors = new ArrayList<>(); - purgeCache( - newCache -> - scan( - (id, cacheEntry, dataValues) -> { - if (ids.contains(id)) { - // this is a branch of a query that has already been reconstructed, but - // then split by decomposition - we don't need to parse it again - return; - } - ids.add(id); - try { - MonitorQuery mq = serializer.deserialize(dataValues.mq.binaryValue()); - for (QueryCacheEntry entry : QueryCacheEntry.decompose(mq, decomposer)) { - newCache.put(entry.cacheId, entry); - } - } catch (Exception e) { - errors.add(e); - } - })); - if (errors.size() > 0) { - IllegalStateException e = - new IllegalStateException("Couldn't parse some queries from the index"); - for (Exception parseError : errors) { - e.addSuppressed(parseError); - } - throw e; - } - } - - private class TermsHashBuilder extends SearcherFactory { - @Override - public IndexSearcher newSearcher(IndexReader reader, IndexReader previousReader) - throws IOException { - IndexSearcher searcher = super.newSearcher(reader, previousReader); - searcher.setQueryCache(null); - termFilters.put(reader.getReaderCacheHelper().getKey(), new QueryTermFilter(reader)); - reader.getReaderCacheHelper().addClosedListener(termFilters::remove); - return searcher; - } - } - - void commit(List updates) throws IOException { - List indexables = buildIndexables(updates); - synchronized (commitLock) { - purgeLock.readLock().lock(); - try { - if (indexables.size() > 0) { - Set ids = new HashSet<>(); - for (Indexable update : indexables) { - ids.add(update.queryCacheEntry.queryId); - } - for (String id : ids) { - writer.deleteDocuments(new Term(FIELDS.query_id, id)); - } - for (Indexable update : indexables) { - this.queries.put(update.queryCacheEntry.cacheId, update.queryCacheEntry); - writer.addDocument(update.document); - if (purgeCache != null) - purgeCache.put(update.queryCacheEntry.cacheId, update.queryCacheEntry); - } - } - writer.commit(); - manager.maybeRefresh(); - } finally { - purgeLock.readLock().unlock(); - } - } - } - - private static class Indexable { - final QueryCacheEntry queryCacheEntry; - final Document document; - - private Indexable(QueryCacheEntry queryCacheEntry, Document document) { - this.queryCacheEntry = queryCacheEntry; - this.document = document; - } - } - - private static final BytesRef EMPTY = new BytesRef(); + protected final List listeners = new ArrayList<>(); - private List buildIndexables(List updates) { - List indexables = new ArrayList<>(); - for (MonitorQuery mq : updates) { - if (serializer != null && mq.getQueryString() == null) { - throw new IllegalArgumentException( - "Cannot add a MonitorQuery with a null string representation to a non-ephemeral Monitor"); - } - BytesRef serialized = serializer == null ? EMPTY : serializer.serialize(mq); - for (QueryCacheEntry qce : QueryCacheEntry.decompose(mq, decomposer)) { - Document doc = presearcher.indexQuery(qce.matchQuery, mq.getMetadata()); - doc.add(new StringField(FIELDS.query_id, qce.queryId, Field.Store.NO)); - doc.add(new SortedDocValuesField(FIELDS.cache_id, new BytesRef(qce.cacheId))); - doc.add(new SortedDocValuesField(FIELDS.query_id, new BytesRef(qce.queryId))); - doc.add(new BinaryDocValuesField(FIELDS.mq, serialized)); - indexables.add(new Indexable(qce, doc)); - } - } - return indexables; - } - - interface QueryBuilder { - Query buildQuery(BiPredicate termAcceptor) throws IOException; - } - - static class QueryTermFilter implements BiPredicate { - - private final Map termsHash = new HashMap<>(); - - QueryTermFilter(IndexReader reader) throws IOException { - for (LeafReaderContext ctx : reader.leaves()) { - for (FieldInfo fi : ctx.reader().getFieldInfos()) { - BytesRefHash terms = termsHash.computeIfAbsent(fi.name, f -> new BytesRefHash()); - Terms t = Terms.getTerms(ctx.reader(), fi.name); - TermsEnum te = t.iterator(); - BytesRef term; - while ((term = te.next()) != null) { - terms.add(term); - } - } - } - } + abstract void commit(List updates) throws IOException; - @Override - public boolean test(String field, BytesRef term) { - BytesRefHash bytes = termsHash.get(field); - if (bytes == null) { - return false; - } - return bytes.find(term) != -1; - } - } - - MonitorQuery getQuery(String queryId) throws IOException { + public MonitorQuery getQuery(String queryId) throws IOException { if (serializer == null) { throw new IllegalStateException( "Cannot get queries from an index with no MonitorQuerySerializer"); @@ -257,7 +56,7 @@ MonitorQuery getQuery(String queryId) throws IOException { return serializer.deserialize(bytesHolder[0]); } - void scan(QueryCollector matcher) throws IOException { + public void scan(QueryCollector matcher) throws IOException { search(new MatchAllDocsQuery(), matcher); } @@ -266,122 +65,27 @@ long search(final Query query, QueryCollector matcher) throws IOException { return search(builder, matcher); } - long search(QueryBuilder queryBuilder, QueryCollector matcher) throws IOException { - IndexSearcher searcher = null; - try { - Map queries; - - purgeLock.readLock().lock(); - try { - searcher = manager.acquire(); - queries = this.queries; - } finally { - purgeLock.readLock().unlock(); - } - - MonitorQueryCollector collector = new MonitorQueryCollector(queries, matcher); - long buildTime = System.nanoTime(); - Query query = - queryBuilder.buildQuery( - termFilters.get(searcher.getIndexReader().getReaderCacheHelper().getKey())); - buildTime = System.nanoTime() - buildTime; - searcher.search(query, collector); - return buildTime; - } finally { - if (searcher != null) { - manager.release(searcher); - } - } - } - - interface CachePopulator { - void populateCacheWithIndex(Map newCache) throws IOException; - } - - void purgeCache() throws IOException { - purgeCache( - newCache -> - scan( - (id, query, dataValues) -> { - if (query != null) newCache.put(query.cacheId, query); - })); - } - - /** - * Remove unused queries from the query cache. - * - *

This is normally called from a background thread at a rate set by configurePurgeFrequency(). - * - * @throws IOException on IO errors - */ - private synchronized void purgeCache(CachePopulator populator) throws IOException { - - // Note on implementation + abstract long search(QueryBuilder queryBuilder, QueryCollector matcher) throws IOException; - // The purge works by scanning the query index and creating a new query cache populated - // for each query in the index. When the scan is complete, the old query cache is swapped - // for the new, allowing it to be garbage-collected. + public abstract void purgeCache() throws IOException; - // In order to not drop cached queries that have been added while a purge is ongoing, - // we use a ReadWriteLock to guard the creation and removal of an register log. Commits take - // the read lock. If the register log has been created, then a purge is ongoing, and queries - // are added to the register log within the read lock guard. + abstract void purgeCache(CachePopulator populator) throws IOException; - // The purge takes the write lock when creating the register log, and then when swapping out - // the old query cache. Within the second write lock guard, the contents of the register log - // are added to the new query cache, and the register log itself is removed. + abstract int numDocs() throws IOException; - final ConcurrentMap newCache = new ConcurrentHashMap<>(); + public abstract int cacheSize(); - purgeLock.writeLock().lock(); - try { - purgeCache = new ConcurrentHashMap<>(); - } finally { - purgeLock.writeLock().unlock(); - } - - populator.populateCacheWithIndex(newCache); - - purgeLock.writeLock().lock(); - try { - newCache.putAll(purgeCache); - purgeCache = null; - queries = newCache; - } finally { - purgeLock.writeLock().unlock(); - } - } + abstract void deleteQueries(List ids) throws IOException; - // --------------------------------------------- - // Proxy trivial operations... - // --------------------------------------------- + abstract void clear() throws IOException; - @Override - public void close() throws IOException { - IOUtils.close(manager, writer, writer.getDirectory()); - } + public abstract long getLastPurged(); - int numDocs() { - return writer.getDocStats().numDocs; + public void addListener(MonitorUpdateListener listener) { + listeners.add(listener); } - int cacheSize() { - return queries.size(); - } - - void deleteQueries(Iterable ids) throws IOException { - for (String id : ids) { - writer.deleteDocuments(new Term(FIELDS.query_id, id)); - } - commit(Collections.emptyList()); - } - - void clear() throws IOException { - writer.deleteAll(); - commit(Collections.emptyList()); - } - - interface QueryCollector { + public interface QueryCollector { void matchQuery(String id, QueryCacheEntry query, DataValues dataValues) throws IOException; @@ -390,11 +94,15 @@ default ScoreMode scoreMode() { } } - // --------------------------------------------- - // Helper classes... - // --------------------------------------------- + interface QueryBuilder { + Query buildQuery(BiPredicate termAcceptor) throws IOException; + } + + interface CachePopulator { + void populateCacheWithIndex(Map newCache) throws IOException; + } - static final class DataValues { + public static final class DataValues { SortedDocValues queryId; SortedDocValues cacheId; BinaryDocValues mq; @@ -411,43 +119,33 @@ void advanceTo(int doc) throws IOException { } } - /** A Collector that decodes the stored query for each document hit. */ - static final class MonitorQueryCollector extends SimpleCollector { - - private final Map queries; - private final QueryCollector matcher; - private final DataValues dataValues = new DataValues(); - - MonitorQueryCollector(Map queries, QueryCollector matcher) { - this.queries = queries; - this.matcher = matcher; - } - - @Override - public void setScorer(Scorable scorer) { - this.dataValues.scorer = scorer; - } + static class QueryTermFilter implements BiPredicate { - @Override - public void collect(int doc) throws IOException { - dataValues.advanceTo(doc); - BytesRef cache_id = dataValues.cacheId.lookupOrd(dataValues.cacheId.ordValue()); - BytesRef query_id = dataValues.queryId.lookupOrd(dataValues.queryId.ordValue()); - QueryCacheEntry query = queries.get(cache_id.utf8ToString()); - matcher.matchQuery(query_id.utf8ToString(), query, dataValues); - } + private final Map termsHash = new HashMap<>(); - @Override - public void doSetNextReader(LeafReaderContext context) throws IOException { - this.dataValues.cacheId = context.reader().getSortedDocValues(FIELDS.cache_id); - this.dataValues.queryId = context.reader().getSortedDocValues(FIELDS.query_id); - this.dataValues.mq = context.reader().getBinaryDocValues(FIELDS.mq); - this.dataValues.ctx = context; + QueryTermFilter(IndexReader reader) throws IOException { + for (LeafReaderContext ctx : reader.leaves()) { + for (FieldInfo fi : ctx.reader().getFieldInfos()) { + BytesRefHash terms = termsHash.computeIfAbsent(fi.name, f -> new BytesRefHash()); + Terms t = ctx.reader().terms(fi.name); + if (t != null) { + TermsEnum te = t.iterator(); + BytesRef term; + while ((term = te.next()) != null) { + terms.add(term); + } + } + } + } } @Override - public ScoreMode scoreMode() { - return matcher.scoreMode(); + public boolean test(String field, BytesRef term) { + BytesRefHash bytes = termsHash.get(field); + if (bytes == null) { + return false; + } + return bytes.find(term) != -1; } } } diff --git a/lucene/monitor/src/java/org/apache/lucene/monitor/ReadonlyQueryIndex.java b/lucene/monitor/src/java/org/apache/lucene/monitor/ReadonlyQueryIndex.java new file mode 100644 index 000000000000..e15a9b0cb705 --- /dev/null +++ b/lucene/monitor/src/java/org/apache/lucene/monitor/ReadonlyQueryIndex.java @@ -0,0 +1,190 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.lucene.monitor; + +import java.io.IOException; +import java.util.List; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import org.apache.lucene.index.LeafReaderContext; +import org.apache.lucene.search.*; +import org.apache.lucene.store.Directory; +import org.apache.lucene.util.BytesRef; +import org.apache.lucene.util.IOUtils; +import org.apache.lucene.util.NamedThreadFactory; + +class ReadonlyQueryIndex extends QueryIndex { + + private final ScheduledExecutorService refreshExecutor; + + public ReadonlyQueryIndex(MonitorConfiguration configuration) throws IOException { + if (configuration.getDirectoryProvider() == null) { + throw new IllegalStateException( + "You must specify a Directory when configuring a Monitor as read-only."); + } + Directory directory = configuration.getDirectoryProvider().get(); + this.manager = new SearcherManager(directory, new TermsHashBuilder(termFilters)); + this.decomposer = configuration.getQueryDecomposer(); + this.serializer = configuration.getQuerySerializer(); + this.refreshExecutor = + Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("cache-purge")); + long refreshFrequency = configuration.getPurgeFrequency(); + this.refreshExecutor.scheduleAtFixedRate( + () -> { + try { + this.purgeCache(); + } catch (IOException e) { + listeners.forEach(l -> l.onPurgeError(e)); + } + }, + refreshFrequency, + refreshFrequency, + configuration.getPurgeFrequencyUnits()); + } + + @Override + public void commit(List updates) throws IOException { + throw new UnsupportedOperationException("Monitor is readOnly cannot commit"); + } + + @Override + public long search(QueryBuilder queryBuilder, QueryCollector matcher) throws IOException { + IndexSearcher searcher = null; + try { + searcher = manager.acquire(); + LazyMonitorQueryCollector collector = + new LazyMonitorQueryCollector(matcher, serializer, decomposer); + long buildTime = System.nanoTime(); + Query query = + queryBuilder.buildQuery( + termFilters.get(searcher.getIndexReader().getReaderCacheHelper().getKey())); + buildTime = System.nanoTime() - buildTime; + searcher.search(query, collector); + return buildTime; + } finally { + if (searcher != null) { + manager.release(searcher); + } + } + } + + @Override + public void purgeCache() throws IOException { + manager.maybeRefresh(); + listeners.forEach(MonitorUpdateListener::onPurge); + } + + @Override + void purgeCache(CachePopulator populator) { + throw new UnsupportedOperationException("Monitor is readOnly, it has no cache"); + } + + @Override + public void close() throws IOException { + refreshExecutor.shutdown(); + IOUtils.close(manager); + } + + @Override + public int numDocs() throws IOException { + IndexSearcher searcher = null; + int numDocs; + try { + searcher = manager.acquire(); + numDocs = searcher.getIndexReader().numDocs(); + } finally { + if (searcher != null) { + manager.release(searcher); + } + } + return numDocs; + } + + @Override + public int cacheSize() { + return -1; + } + + @Override + public void deleteQueries(List ids) throws IOException { + throw new UnsupportedOperationException("Monitor is readOnly cannot delete queries"); + } + + @Override + public void clear() throws IOException { + throw new UnsupportedOperationException("Monitor is readOnly cannot clear"); + } + + @Override + public long getLastPurged() { + return -1; + } + + // --------------------------------------------- + // Helper classes... + // --------------------------------------------- + + /** A Collector that decodes the stored query for each document hit reparsing them everytime. */ + static final class LazyMonitorQueryCollector extends SimpleCollector { + private final QueryIndex.QueryCollector matcher; + private final QueryIndex.DataValues dataValues = new QueryIndex.DataValues(); + private final MonitorQuerySerializer serializer; + private final QueryDecomposer decomposer; + + LazyMonitorQueryCollector( + QueryIndex.QueryCollector matcher, + MonitorQuerySerializer serializer, + QueryDecomposer decomposer) { + this.matcher = matcher; + this.serializer = serializer; + this.decomposer = decomposer; + } + + @Override + public void setScorer(Scorable scorer) { + this.dataValues.scorer = scorer; + } + + @Override + public void collect(int doc) throws IOException { + dataValues.advanceTo(doc); + BytesRef cache_id = dataValues.cacheId.lookupOrd(dataValues.cacheId.ordValue()); + BytesRef query_id = dataValues.queryId.lookupOrd(dataValues.queryId.ordValue()); + MonitorQuery mq = serializer.deserialize(dataValues.mq.binaryValue()); + QueryCacheEntry query = + QueryCacheEntry.decompose(mq, decomposer).stream() + .filter(queryCacheEntry -> queryCacheEntry.cacheId.equals(cache_id.utf8ToString())) + .findFirst() + .orElseThrow(() -> new IllegalStateException("Cached queries not found")); + matcher.matchQuery(query_id.utf8ToString(), query, dataValues); + } + + @Override + public void doSetNextReader(LeafReaderContext context) throws IOException { + this.dataValues.cacheId = context.reader().getSortedDocValues(QueryIndex.FIELDS.cache_id); + this.dataValues.queryId = context.reader().getSortedDocValues(QueryIndex.FIELDS.query_id); + this.dataValues.mq = context.reader().getBinaryDocValues(QueryIndex.FIELDS.mq); + this.dataValues.ctx = context; + } + + @Override + public ScoreMode scoreMode() { + return matcher.scoreMode(); + } + } +} diff --git a/lucene/monitor/src/java/org/apache/lucene/monitor/TermsHashBuilder.java b/lucene/monitor/src/java/org/apache/lucene/monitor/TermsHashBuilder.java new file mode 100644 index 000000000000..914df1dce1a3 --- /dev/null +++ b/lucene/monitor/src/java/org/apache/lucene/monitor/TermsHashBuilder.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.lucene.monitor; + +import java.io.IOException; +import java.util.Map; +import org.apache.lucene.index.IndexReader; +import org.apache.lucene.search.IndexSearcher; +import org.apache.lucene.search.SearcherFactory; + +class TermsHashBuilder extends SearcherFactory { + private final Map termFilters; + + TermsHashBuilder(Map termFilters) { + this.termFilters = termFilters; + } + + @Override + public IndexSearcher newSearcher(IndexReader reader, IndexReader previousReader) + throws IOException { + IndexSearcher searcher = super.newSearcher(reader, previousReader); + searcher.setQueryCache(null); + termFilters.put(reader.getReaderCacheHelper().getKey(), new QueryIndex.QueryTermFilter(reader)); + reader.getReaderCacheHelper().addClosedListener(termFilters::remove); + return searcher; + } +} diff --git a/lucene/monitor/src/java/org/apache/lucene/monitor/WritableQueryIndex.java b/lucene/monitor/src/java/org/apache/lucene/monitor/WritableQueryIndex.java new file mode 100644 index 000000000000..f4d7fa51c5e8 --- /dev/null +++ b/lucene/monitor/src/java/org/apache/lucene/monitor/WritableQueryIndex.java @@ -0,0 +1,367 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.lucene.monitor; + +import java.io.IOException; +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import org.apache.lucene.document.*; +import org.apache.lucene.index.IndexWriter; +import org.apache.lucene.index.LeafReaderContext; +import org.apache.lucene.index.Term; +import org.apache.lucene.search.*; +import org.apache.lucene.util.BytesRef; +import org.apache.lucene.util.IOUtils; +import org.apache.lucene.util.NamedThreadFactory; + +class WritableQueryIndex extends QueryIndex { + + private final IndexWriter writer; + private final Presearcher presearcher; + + /* Used to cache updates while a purge is ongoing */ + private volatile Map purgeCache = null; + + /* Used to lock around the creation of the purgeCache */ + private final ReadWriteLock purgeLock = new ReentrantReadWriteLock(); + private final Object commitLock = new Object(); + + private final ScheduledExecutorService purgeExecutor; + + /* The current query cache */ + // NB this is not final because it can be replaced by purgeCache() + protected volatile ConcurrentMap queries; + + protected long lastPurged = -1; + + WritableQueryIndex(MonitorConfiguration configuration, Presearcher presearcher) + throws IOException { + + this.writer = configuration.buildIndexWriter(); + this.queries = new ConcurrentHashMap<>(); + this.manager = new SearcherManager(writer, true, true, new TermsHashBuilder(termFilters)); + this.decomposer = configuration.getQueryDecomposer(); + this.serializer = configuration.getQuerySerializer(); + this.presearcher = presearcher; + populateQueryCache(serializer, decomposer); + + long purgeFrequency = configuration.getPurgeFrequency(); + this.purgeExecutor = + Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("cache-purge")); + this.purgeExecutor.scheduleAtFixedRate( + () -> { + try { + purgeCache(); + } catch (Throwable e) { + listeners.forEach(l -> l.onPurgeError(e)); + } + }, + purgeFrequency, + purgeFrequency, + configuration.getPurgeFrequencyUnits()); + } + + @Override + public void commit(List updates) throws IOException { + commitWithoutNotify(updates); + listeners.forEach(l -> l.afterUpdate(updates)); + } + + private void commitWithoutNotify(List updates) throws IOException { + List indexables = buildIndexables(updates); + synchronized (commitLock) { + purgeLock.readLock().lock(); + try { + if (indexables.size() > 0) { + Set ids = new HashSet<>(); + for (Indexable update : indexables) { + ids.add(update.queryCacheEntry.queryId); + } + for (String id : ids) { + writer.deleteDocuments(new Term(FIELDS.query_id, id)); + } + for (Indexable update : indexables) { + this.queries.put(update.queryCacheEntry.cacheId, update.queryCacheEntry); + writer.addDocument(update.document); + if (purgeCache != null) + purgeCache.put(update.queryCacheEntry.cacheId, update.queryCacheEntry); + } + } + writer.commit(); + manager.maybeRefresh(); + } finally { + purgeLock.readLock().unlock(); + } + } + } + + private static class Indexable { + final QueryCacheEntry queryCacheEntry; + final Document document; + + private Indexable(QueryCacheEntry queryCacheEntry, Document document) { + this.queryCacheEntry = queryCacheEntry; + this.document = document; + } + } + + private void populateQueryCache(MonitorQuerySerializer serializer, QueryDecomposer decomposer) + throws IOException { + if (serializer == null) { + // No query serialization happening here - check that the cache is empty + IndexSearcher searcher = manager.acquire(); + try { + if (searcher.count(new MatchAllDocsQuery()) != 0) { + throw new IllegalStateException( + "Attempting to open a non-empty monitor query index with no MonitorQuerySerializer"); + } + } finally { + manager.release(searcher); + } + return; + } + Set ids = new HashSet<>(); + List errors = new ArrayList<>(); + purgeCache( + newCache -> + scan( + (id, cacheEntry, dataValues) -> { + if (ids.contains(id)) { + // this is a branch of a query that has already been reconstructed, but + // then split by decomposition - we don't need to parse it again + return; + } + ids.add(id); + try { + MonitorQuery mq = serializer.deserialize(dataValues.mq.binaryValue()); + for (QueryCacheEntry entry : QueryCacheEntry.decompose(mq, decomposer)) { + newCache.put(entry.cacheId, entry); + } + } catch (Exception e) { + errors.add(e); + } + })); + if (errors.size() > 0) { + IllegalStateException e = + new IllegalStateException("Couldn't parse some queries from the index"); + for (Exception parseError : errors) { + e.addSuppressed(parseError); + } + throw e; + } + } + + private static final BytesRef EMPTY = new BytesRef(); + + private List buildIndexables(List updates) { + List indexables = new ArrayList<>(); + for (MonitorQuery mq : updates) { + if (serializer != null && mq.getQueryString() == null) { + throw new IllegalArgumentException( + "Cannot add a MonitorQuery with a null string representation to a non-ephemeral Monitor"); + } + BytesRef serialized = serializer == null ? EMPTY : serializer.serialize(mq); + for (QueryCacheEntry qce : QueryCacheEntry.decompose(mq, decomposer)) { + Document doc = presearcher.indexQuery(qce.matchQuery, mq.getMetadata()); + doc.add(new StringField(FIELDS.query_id, qce.queryId, Field.Store.NO)); + doc.add(new SortedDocValuesField(FIELDS.cache_id, new BytesRef(qce.cacheId))); + doc.add(new SortedDocValuesField(FIELDS.query_id, new BytesRef(qce.queryId))); + doc.add(new BinaryDocValuesField(FIELDS.mq, serialized)); + indexables.add(new Indexable(qce, doc)); + } + } + return indexables; + } + + @Override + public long search(QueryBuilder queryBuilder, QueryCollector matcher) throws IOException { + IndexSearcher searcher = null; + try { + Map queries; + + purgeLock.readLock().lock(); + try { + searcher = manager.acquire(); + queries = this.queries; + } finally { + purgeLock.readLock().unlock(); + } + + MonitorQueryCollector collector = new MonitorQueryCollector(queries, matcher); + long buildTime = System.nanoTime(); + Query query = + queryBuilder.buildQuery( + termFilters.get(searcher.getIndexReader().getReaderCacheHelper().getKey())); + buildTime = System.nanoTime() - buildTime; + searcher.search(query, collector); + return buildTime; + } finally { + if (searcher != null) { + manager.release(searcher); + } + } + } + + @Override + public void purgeCache() throws IOException { + purgeCache( + newCache -> + scan( + (id, query, dataValues) -> { + if (query != null) newCache.put(query.cacheId, query); + })); + lastPurged = System.nanoTime(); + listeners.forEach(MonitorUpdateListener::onPurge); + } + + @Override + /** + * Remove unused queries from the query cache. + * + *

This is normally called from a background thread at a rate set by configurePurgeFrequency(). + * + * @throws IOException on IO errors + */ + synchronized void purgeCache(CachePopulator populator) throws IOException { + + // Note on implementation + + // The purge works by scanning the query index and creating a new query cache populated + // for each query in the index. When the scan is complete, the old query cache is swapped + // for the new, allowing it to be garbage-collected. + + // In order to not drop cached queries that have been added while a purge is ongoing, + // we use a ReadWriteLock to guard the creation and removal of an register log. Commits take + // the read lock. If the register log has been created, then a purge is ongoing, and queries + // are added to the register log within the read lock guard. + + // The purge takes the write lock when creating the register log, and then when swapping out + // the old query cache. Within the second write lock guard, the contents of the register log + // are added to the new query cache, and the register log itself is removed. + + final ConcurrentMap newCache = new ConcurrentHashMap<>(); + + purgeLock.writeLock().lock(); + try { + purgeCache = new ConcurrentHashMap<>(); + } finally { + purgeLock.writeLock().unlock(); + } + + populator.populateCacheWithIndex(newCache); + + purgeLock.writeLock().lock(); + try { + newCache.putAll(purgeCache); + purgeCache = null; + queries = newCache; + } finally { + purgeLock.writeLock().unlock(); + } + } + + // --------------------------------------------- + // Proxy trivial operations... + // --------------------------------------------- + + @Override + public void close() throws IOException { + purgeExecutor.shutdown(); + IOUtils.close(manager, writer, writer.getDirectory()); + } + + @Override + public int numDocs() throws IOException { + return writer.getDocStats().numDocs; + } + + @Override + public int cacheSize() { + return queries.size(); + } + + @Override + public void deleteQueries(List ids) throws IOException { + for (String id : ids) { + writer.deleteDocuments(new Term(FIELDS.query_id, id)); + } + commitWithoutNotify(Collections.emptyList()); + listeners.forEach(l -> l.afterDelete(ids)); + } + + @Override + public void clear() throws IOException { + writer.deleteAll(); + commitWithoutNotify(Collections.emptyList()); + listeners.forEach(MonitorUpdateListener::afterClear); + } + + @Override + public long getLastPurged() { + return lastPurged; + } + + // --------------------------------------------- + // Helper classes... + // --------------------------------------------- + + /** A Collector that decodes the stored query for each document hit. */ + static final class MonitorQueryCollector extends SimpleCollector { + + private final Map queries; + private final QueryCollector matcher; + private final DataValues dataValues = new DataValues(); + + MonitorQueryCollector(Map queries, QueryCollector matcher) { + this.queries = queries; + this.matcher = matcher; + } + + @Override + public void setScorer(Scorable scorer) { + this.dataValues.scorer = scorer; + } + + @Override + public void collect(int doc) throws IOException { + dataValues.advanceTo(doc); + BytesRef cache_id = dataValues.cacheId.lookupOrd(dataValues.cacheId.ordValue()); + BytesRef query_id = dataValues.queryId.lookupOrd(dataValues.queryId.ordValue()); + QueryCacheEntry query = queries.get(cache_id.utf8ToString()); + matcher.matchQuery(query_id.utf8ToString(), query, dataValues); + } + + @Override + public void doSetNextReader(LeafReaderContext context) throws IOException { + this.dataValues.cacheId = context.reader().getSortedDocValues(FIELDS.cache_id); + this.dataValues.queryId = context.reader().getSortedDocValues(FIELDS.query_id); + this.dataValues.mq = context.reader().getBinaryDocValues(FIELDS.mq); + this.dataValues.ctx = context; + } + + @Override + public ScoreMode scoreMode() { + return matcher.scoreMode(); + } + } +} diff --git a/lucene/monitor/src/test/org/apache/lucene/monitor/TestCachePurging.java b/lucene/monitor/src/test/org/apache/lucene/monitor/TestCachePurging.java index c77f622ed02c..e3ab9b48f7bc 100644 --- a/lucene/monitor/src/test/org/apache/lucene/monitor/TestCachePurging.java +++ b/lucene/monitor/src/test/org/apache/lucene/monitor/TestCachePurging.java @@ -155,7 +155,12 @@ public void testBackgroundPurges() throws IOException, InterruptedException { @Override public void onPurge() { // It can sometimes take a couple of purge runs to get everything in sync - if (monitor.getQueryCacheStats().cachedQueries == 99) latch.countDown(); + try { + if (monitor.getQueryCacheStats().cachedQueries == 99) latch.countDown(); + } catch (IOException e) { + // Ignore + throw new RuntimeException(e); + } } }); diff --git a/lucene/monitor/src/test/org/apache/lucene/monitor/TestMonitorReadonly.java b/lucene/monitor/src/test/org/apache/lucene/monitor/TestMonitorReadonly.java new file mode 100644 index 000000000000..29a873eb34f1 --- /dev/null +++ b/lucene/monitor/src/test/org/apache/lucene/monitor/TestMonitorReadonly.java @@ -0,0 +1,226 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.lucene.monitor; + +import java.io.IOException; +import java.nio.file.Path; +import java.util.Collections; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import org.apache.lucene.analysis.Analyzer; +import org.apache.lucene.analysis.core.WhitespaceAnalyzer; +import org.apache.lucene.document.Document; +import org.apache.lucene.document.Field; +import org.apache.lucene.index.IndexNotFoundException; +import org.apache.lucene.index.Term; +import org.apache.lucene.search.TermQuery; +import org.apache.lucene.store.FSDirectory; +import org.junit.Test; + +public class TestMonitorReadonly extends MonitorTestBase { + private static final Analyzer ANALYZER = new WhitespaceAnalyzer(); + + @Test + public void testReadonlyMonitorThrowsOnInexistentIndex() { + Path indexDirectory = createTempDir(); + MonitorConfiguration config = + new MonitorConfiguration() + .setDirectoryProvider( + () -> FSDirectory.open(indexDirectory), + MonitorQuerySerializer.fromParser(MonitorTestBase::parse), + true); + assertThrows( + IndexNotFoundException.class, + () -> { + new Monitor(ANALYZER, config); + }); + } + + @Test + public void testReadonlyMonitorThrowsWhenCallingWriteRequests() throws IOException { + Path indexDirectory = createTempDir(); + MonitorConfiguration writeConfig = + new MonitorConfiguration() + .setIndexPath( + indexDirectory, MonitorQuerySerializer.fromParser(MonitorTestBase::parse)); + + // this will create the index + Monitor writeMonitor = new Monitor(ANALYZER, writeConfig); + writeMonitor.close(); + + MonitorConfiguration config = + new MonitorConfiguration() + .setDirectoryProvider( + () -> FSDirectory.open(indexDirectory), + MonitorQuerySerializer.fromParser(MonitorTestBase::parse), + true); + try (Monitor monitor = new Monitor(ANALYZER, config)) { + assertThrows( + UnsupportedOperationException.class, + () -> { + TermQuery query = new TermQuery(new Term(FIELD, "test")); + monitor.register( + new MonitorQuery("query1", query, query.toString(), Collections.emptyMap())); + }); + + assertThrows( + UnsupportedOperationException.class, + () -> { + monitor.deleteById("query1"); + }); + + assertThrows( + UnsupportedOperationException.class, + () -> { + monitor.clear(); + }); + } + } + + @Test + public void testSettingCustomDirectory() throws IOException { + Path indexDirectory = createTempDir(); + Document doc = new Document(); + doc.add(newTextField(FIELD, "This is a Foobar test document", Field.Store.NO)); + + MonitorConfiguration writeConfig = + new MonitorConfiguration() + .setDirectoryProvider( + () -> FSDirectory.open(indexDirectory), + MonitorQuerySerializer.fromParser(MonitorTestBase::parse)); + + try (Monitor writeMonitor = new Monitor(ANALYZER, writeConfig)) { + TermQuery query = new TermQuery(new Term(FIELD, "test")); + writeMonitor.register( + new MonitorQuery("query1", query, query.toString(), Collections.emptyMap())); + TermQuery query2 = new TermQuery(new Term(FIELD, "Foobar")); + writeMonitor.register( + new MonitorQuery("query2", query2, query.toString(), Collections.emptyMap())); + MatchingQueries matches = writeMonitor.match(doc, QueryMatch.SIMPLE_MATCHER); + assertNotNull(matches.getMatches()); + assertEquals(2, matches.getMatchCount()); + assertNotNull(matches.matches("query2")); + } + } + + @Test + public void testMonitorReadOnlyCouldReadOnTheSameIndex() throws IOException { + Path indexDirectory = createTempDir(); + Document doc = new Document(); + doc.add(newTextField(FIELD, "This is a test document", Field.Store.NO)); + + MonitorConfiguration writeConfig = + new MonitorConfiguration() + .setDirectoryProvider( + () -> FSDirectory.open(indexDirectory), + MonitorQuerySerializer.fromParser(MonitorTestBase::parse)); + + try (Monitor writeMonitor = new Monitor(ANALYZER, writeConfig)) { + TermQuery query = new TermQuery(new Term(FIELD, "test")); + writeMonitor.register( + new MonitorQuery("query1", query, query.toString(), Collections.emptyMap())); + } + + MonitorConfiguration readConfig = + new MonitorConfiguration() + .setDirectoryProvider( + () -> FSDirectory.open(indexDirectory), + MonitorQuerySerializer.fromParser(MonitorTestBase::parse), + true); + + try (Monitor readMonitor1 = new Monitor(ANALYZER, readConfig)) { + MatchingQueries matches = readMonitor1.match(doc, QueryMatch.SIMPLE_MATCHER); + assertNotNull(matches.getMatches()); + assertEquals(1, matches.getMatchCount()); + assertNotNull(matches.matches("query1")); + } + + try (Monitor readMonitor2 = new Monitor(ANALYZER, readConfig)) { + MatchingQueries matches = readMonitor2.match(doc, QueryMatch.SIMPLE_MATCHER); + assertNotNull(matches.getMatches()); + assertEquals(1, matches.getMatchCount()); + assertNotNull(matches.matches("query1")); + + assertThrows( + UnsupportedOperationException.class, + () -> { + TermQuery query = new TermQuery(new Term(FIELD, "test")); + readMonitor2.register( + new MonitorQuery("query1", query, query.toString(), Collections.emptyMap())); + }); + } + } + + @Test + public void testReadonlyMonitorGetsRefreshed() throws IOException, InterruptedException { + Path indexDirectory = createTempDir(); + Document doc = new Document(); + doc.add(newTextField(FIELD, "This is a test document", Field.Store.NO)); + + MonitorConfiguration writeConfig = + new MonitorConfiguration() + .setDirectoryProvider( + () -> FSDirectory.open(indexDirectory), + MonitorQuerySerializer.fromParser(MonitorTestBase::parse)); + + try (Monitor writeMonitor = new Monitor(ANALYZER, writeConfig)) { + TermQuery query = new TermQuery(new Term(FIELD, "test")); + writeMonitor.register( + new MonitorQuery("query1", query, query.toString(), Collections.emptyMap())); + + MonitorConfiguration readConfig = + new MonitorConfiguration() + .setPurgeFrequency(2, TimeUnit.SECONDS) + .setDirectoryProvider( + () -> FSDirectory.open(indexDirectory), + MonitorQuerySerializer.fromParser(MonitorTestBase::parse), + true); + + try (Monitor readMonitor = new Monitor(ANALYZER, readConfig)) { + MatchingQueries matches = readMonitor.match(doc, QueryMatch.SIMPLE_MATCHER); + assertNotNull(matches.getMatches()); + assertEquals(1, matches.getMatchCount()); + assertNotNull(matches.matches("query1")); + + TermQuery query2 = new TermQuery(new Term(FIELD, "test")); + writeMonitor.register( + new MonitorQuery("query2", query2, query2.toString(), Collections.emptyMap())); + + // Index returns stale result until background refresh thread calls maybeRefresh + MatchingQueries matches2 = readMonitor.match(doc, QueryMatch.SIMPLE_MATCHER); + assertNotNull(matches2.getMatches()); + assertEquals(1, matches2.getMatchCount()); + CountDownLatch latch = new CountDownLatch(1); + readMonitor.addQueryIndexUpdateListener( + new MonitorUpdateListener() { + @Override + public void onPurge() { + latch.countDown(); + } + }); + assertTrue( + latch.await(readConfig.getPurgeFrequency() + 1, readConfig.getPurgeFrequencyUnits())); + + // after frequency results are refreshed + MatchingQueries matches3 = readMonitor.match(doc, QueryMatch.SIMPLE_MATCHER); + assertNotNull(matches3.getMatches()); + assertEquals(2, matches3.getMatchCount()); + } + } + } +} diff --git a/lucene/monitor/src/test/org/apache/lucene/monitor/TestQueryTermFilter.java b/lucene/monitor/src/test/org/apache/lucene/monitor/TestQueryTermFilter.java index 9deacc1316ef..01338b743e17 100644 --- a/lucene/monitor/src/test/org/apache/lucene/monitor/TestQueryTermFilter.java +++ b/lucene/monitor/src/test/org/apache/lucene/monitor/TestQueryTermFilter.java @@ -31,8 +31,8 @@ public class TestQueryTermFilter extends LuceneTestCase { public void testFiltersAreRemoved() throws IOException { - try (QueryIndex qi = - new QueryIndex(new MonitorConfiguration(), new TermFilteredPresearcher())) { + try (WritableQueryIndex qi = + new WritableQueryIndex(new MonitorConfiguration(), new TermFilteredPresearcher())) { qi.commit( Collections.singletonList(new MonitorQuery("1", new TermQuery(new Term(FIELD, "term"))))); assertEquals(1, qi.termFilters.size());