Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Monitor Improvements LUCENE-10422 #679

Merged
merged 18 commits into from
Mar 21, 2022
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 14 additions & 35 deletions lucene/monitor/src/java/org/apache/lucene/monitor/Monitor.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.document.Document;
import org.apache.lucene.index.LeafReader;
Expand All @@ -38,7 +36,6 @@
import org.apache.lucene.search.ScoreMode;
import org.apache.lucene.search.Scorer;
import org.apache.lucene.search.Weight;
import org.apache.lucene.util.NamedThreadFactory;

/**
* A Monitor contains a set of {@link Query} objects with associated IDs, and efficiently matches
Expand All @@ -51,14 +48,8 @@ public class Monitor implements Closeable {

private final QueryIndex queryIndex;

private final List<MonitorUpdateListener> listeners = new ArrayList<>();

private final long commitBatchSize;

private final ScheduledExecutorService purgeExecutor;

private long lastPurged = -1;

/**
* Create a non-persistent Monitor instance with the default term-filtering Presearcher
*
Expand Down Expand Up @@ -100,22 +91,11 @@ public Monitor(Analyzer analyzer, Presearcher presearcher, MonitorConfiguration

this.analyzer = analyzer;
this.presearcher = presearcher;
this.queryIndex = new QueryIndex(configuration, presearcher);

long purgeFrequency = configuration.getPurgeFrequency();
this.purgeExecutor =
Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("cache-purge"));
this.purgeExecutor.scheduleAtFixedRate(
() -> {
try {
purgeCache();
} catch (Throwable e) {
listeners.forEach(l -> l.onPurgeError(e));
}
},
purgeFrequency,
purgeFrequency,
configuration.getPurgeFrequencyUnits());
if (configuration.isReadOnly()) {
this.queryIndex = new ReadonlyQueryIndex(configuration);
} else {
this.queryIndex = new WritableQueryIndex(configuration, presearcher);
}

this.commitBatchSize = configuration.getQueryUpdateBufferSize();
}
Expand All @@ -125,14 +105,16 @@ public Monitor(Analyzer analyzer, Presearcher presearcher, MonitorConfiguration
* Monitor's queryindex
*
* @param listener listener to register
* @throws IllegalStateException when Monitor is readonly
*/
public void addQueryIndexUpdateListener(MonitorUpdateListener listener) {
listeners.add(listener);
queryIndex.addListener(listener);
}

/** @return Statistics for the internal query index and cache */
public QueryCacheStats getQueryCacheStats() {
return new QueryCacheStats(queryIndex.numDocs(), queryIndex.cacheSize(), lastPurged);
public QueryCacheStats getQueryCacheStats() throws IOException {
return new QueryCacheStats(
queryIndex.numDocs(), queryIndex.cacheSize(), queryIndex.getLastPurged());
}

/** Statistics for the query cache and query index */
Expand All @@ -159,17 +141,17 @@ public QueryCacheStats(int queries, int cachedQueries, long lastPurged) {
*
* <p>This is normally called from a background thread at a rate set by configurePurgeFrequency().
*
* <p>When Monitor is in read-only mode, cache is NEVER purged automatically you MUST call it when
* you want new changes.
*
* @throws IOException on IO errors
*/
public void purgeCache() throws IOException {
queryIndex.purgeCache();
lastPurged = System.nanoTime();
listeners.forEach(MonitorUpdateListener::onPurge);
}

@Override
public void close() throws IOException {
purgeExecutor.shutdown();
queryIndex.close();
}

Expand All @@ -192,7 +174,6 @@ public void register(Iterable<MonitorQuery> queries) throws IOException {

private void commit(List<MonitorQuery> updates) throws IOException {
queryIndex.commit(updates);
listeners.forEach(l -> l.afterUpdate(updates));
}

/**
Expand All @@ -213,7 +194,6 @@ public void register(MonitorQuery... queries) throws IOException {
*/
public void deleteById(List<String> queryIds) throws IOException {
queryIndex.deleteQueries(queryIds);
listeners.forEach(l -> l.afterDelete(queryIds));
}

/**
Expand All @@ -233,7 +213,6 @@ public void deleteById(String... queryIds) throws IOException {
*/
public void clear() throws IOException {
queryIndex.clear();
listeners.forEach(MonitorUpdateListener::afterClear);
}

/**
Expand Down Expand Up @@ -287,7 +266,7 @@ public MonitorQuery getQuery(final String queryId) throws IOException {
}

/** @return the number of queries (after decomposition) stored in this Monitor */
public int getDisjunctCount() {
public int getDisjunctCount() throws IOException {
return queryIndex.numDocs();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.lucene.store.ByteBuffersDirectory;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FSDirectory;
import org.apache.lucene.util.IOSupplier;

/** Encapsulates various configuration settings for a Monitor's query index */
public class MonitorConfiguration {
Expand All @@ -35,8 +36,9 @@ public class MonitorConfiguration {
private long purgeFrequency = 5;
private TimeUnit purgeFrequencyUnits = TimeUnit.MINUTES;
private QueryDecomposer queryDecomposer = new QueryDecomposer();
private Path indexPath = null;
private MonitorQuerySerializer serializer;
private boolean readOnly = false;
private IOSupplier<Directory> directoryProvider = () -> new ByteBuffersDirectory();

private static IndexWriterConfig defaultIndexWriterConfig() {
IndexWriterConfig iwc = new IndexWriterConfig(new KeywordAnalyzer());
Expand All @@ -47,16 +49,52 @@ private static IndexWriterConfig defaultIndexWriterConfig() {
return iwc;
}

public boolean isReadOnly() {
return readOnly;
}

public IOSupplier<Directory> getDirectoryProvider() {
return directoryProvider;
}

/**
* Sets a custom directory, with a custom serializer.
*
* <p>You have also the chance to configure the Monitor as read-only.
*
* <p>When the monitor is in read-only mode it loads indexed queries in memory and matches against
* the without refreshing the index, to force the refresh you MUST call purgeCache()
*
* @param directoryProvider lambda to provide the index Directory implementation
* @param serializer the serializer used to store the queries
* @param readOnly set the monitor as read-only
* @return
*/
public MonitorConfiguration setDirectoryProvider(
IOSupplier<Directory> directoryProvider,
MonitorQuerySerializer serializer,
Boolean readOnly) {
this.directoryProvider = directoryProvider;
this.serializer = serializer;
this.readOnly = readOnly;
return this;
}

public MonitorConfiguration setDirectoryProvider(
IOSupplier<Directory> directoryProvider, MonitorQuerySerializer serializer) {
this.directoryProvider = directoryProvider;
this.serializer = serializer;
return this;
}

public MonitorConfiguration setIndexPath(Path indexPath, MonitorQuerySerializer serializer) {
this.indexPath = indexPath;
this.serializer = serializer;
this.directoryProvider = () -> FSDirectory.open(indexPath);
return this;
}

public IndexWriter buildIndexWriter() throws IOException {
Directory directory =
indexPath == null ? new ByteBuffersDirectory() : FSDirectory.open(indexPath);
return new IndexWriter(directory, getIndexWriterConfig());
return new IndexWriter(directoryProvider.get(), getIndexWriterConfig());
}

protected IndexWriterConfig getIndexWriterConfig() {
Expand Down
Loading