Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change grok watch dog to be Matcher based instead of thread based. #48346

Merged
merged 5 commits into from
Oct 24, 2019
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 18 additions & 24 deletions libs/grok/src/main/java/org/elasticsearch/grok/Grok.java
Original file line number Diff line number Diff line change
Expand Up @@ -73,24 +73,24 @@ public final class Grok {
private final Map<String, String> patternBank;
private final boolean namedCaptures;
private final Regex compiledExpression;
private final ThreadWatchdog threadWatchdog;
private final MatcherWatchdog matcherWatchdog;

public Grok(Map<String, String> patternBank, String grokPattern) {
this(patternBank, grokPattern, true, ThreadWatchdog.noop());
this(patternBank, grokPattern, true, MatcherWatchdog.noop());
}

public Grok(Map<String, String> patternBank, String grokPattern, ThreadWatchdog threadWatchdog) {
this(patternBank, grokPattern, true, threadWatchdog);
public Grok(Map<String, String> patternBank, String grokPattern, MatcherWatchdog matcherWatchdog) {
this(patternBank, grokPattern, true, matcherWatchdog);
}

Grok(Map<String, String> patternBank, String grokPattern, boolean namedCaptures) {
this(patternBank, grokPattern, namedCaptures, ThreadWatchdog.noop());
this(patternBank, grokPattern, namedCaptures, MatcherWatchdog.noop());
}

private Grok(Map<String, String> patternBank, String grokPattern, boolean namedCaptures, ThreadWatchdog threadWatchdog) {
private Grok(Map<String, String> patternBank, String grokPattern, boolean namedCaptures, MatcherWatchdog matcherWatchdog) {
this.patternBank = patternBank;
this.namedCaptures = namedCaptures;
this.threadWatchdog = threadWatchdog;
this.matcherWatchdog = matcherWatchdog;

for (Map.Entry<String, String> entry : patternBank.entrySet()) {
String name = entry.getKey();
Expand Down Expand Up @@ -168,14 +168,12 @@ public String toRegex(String grokPattern) {

int result;
try {
threadWatchdog.register();
result = matcher.searchInterruptible(0, grokPatternBytes.length, Option.NONE);
} catch (InterruptedException e) {
result = Matcher.INTERRUPTED;
matcherWatchdog.register(matcher);
result = matcher.search(0, grokPatternBytes.length, Option.NONE);
} finally {
threadWatchdog.unregister();
matcherWatchdog.unregister(matcher);
}
if (result != -1) {
if (result >= 0) {
Region region = matcher.getEagerRegion();
String namedPatternRef = groupMatch(NAME_GROUP, region, grokPattern);
String subName = groupMatch(SUBNAME_GROUP, region, grokPattern);
Expand Down Expand Up @@ -219,12 +217,10 @@ public boolean match(String text) {
Matcher matcher = compiledExpression.matcher(text.getBytes(StandardCharsets.UTF_8));
int result;
try {
threadWatchdog.register();
result = matcher.searchInterruptible(0, text.length(), Option.DEFAULT);
} catch (InterruptedException e) {
result = Matcher.INTERRUPTED;
matcherWatchdog.register(matcher);
result = matcher.search(0, text.length(), Option.DEFAULT);
} finally {
threadWatchdog.unregister();
matcherWatchdog.unregister(matcher);
}
return (result != -1);
Copy link
Contributor

@droberts195 droberts195 Oct 23, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is inconsistent with the Javadoc. The Javadoc says the return value is "true if grok expression matches text, false otherwise". But given the current code it should say "true if grok expression matches text or there is a timeout, false otherwise".

Probably a better fix would be to change this line to return (result >= 0);. It looks like ML is the only component outside of test code that calls this method. If the ML file structure finder is told there's a match when actually there's a timeout then it will move onto the next step but then time out almost immediately afterwards when the overall elapsed time is checked during that next step, so the net effect is still that the endpoint times out. So from an ML perspective I don't mind whether you change this line or not. But it might be best to make the return value more intuitive before someone else uses this method in production code.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But it might be best to make the return value more intuitive before someone else uses this method in production code.

Agreed. I will change the jdocs in this PR and in a followup will do the change that you're suggesting here.

}
Expand All @@ -241,16 +237,14 @@ public Map<String, Object> captures(String text) {
Matcher matcher = compiledExpression.matcher(textAsBytes);
int result;
try {
threadWatchdog.register();
result = matcher.searchInterruptible(0, textAsBytes.length, Option.DEFAULT);
} catch (InterruptedException e) {
result = Matcher.INTERRUPTED;
matcherWatchdog.register(matcher);
result = matcher.search(0, textAsBytes.length, Option.DEFAULT);
} finally {
threadWatchdog.unregister();
matcherWatchdog.unregister(matcher);
}
if (result == Matcher.INTERRUPTED) {
throw new RuntimeException("grok pattern matching was interrupted after [" +
threadWatchdog.maxExecutionTimeInMillis() + "] ms");
matcherWatchdog.maxExecutionTimeInMillis() + "] ms");
} else if (result == Matcher.FAILED) {
// TODO: I think we should throw an error here?
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
*/
package org.elasticsearch.grok;

import org.joni.Matcher;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
Expand All @@ -27,88 +29,92 @@

/**
* Protects against long running operations that happen between the register and unregister invocations.
* Threads that invoke {@link #register()}, but take too long to invoke the {@link #unregister()} method
* Threads that invoke {@link #register(Matcher)}, but take too long to invoke the {@link #unregister(Matcher)} method
* will be interrupted.
*
* This is needed for Joni's {@link org.joni.Matcher#search(int, int, int)} method, because
* it can end up spinning endlessly if the regular expression is too complex. Joni has checks
* that for every 30k iterations it checks if the current thread is interrupted and if so
* returns {@link org.joni.Matcher#INTERRUPTED}.
*/
public interface ThreadWatchdog {
public interface MatcherWatchdog {

/**
* Registers the current thread and interrupts the current thread
* if the takes too long for this thread to invoke {@link #unregister()}.
* Registers the current matcher and interrupts the this matcher
* if the takes too long for this thread to invoke {@link #unregister(Matcher)}.
*
* @param matcher The matcher to register
*/
void register();
void register(Matcher matcher);

/**
* @return The maximum allowed time in milliseconds for a thread to invoke {@link #unregister()}
* after {@link #register()} has been invoked before this ThreadWatchDog starts to interrupting that thread.
* @return The maximum allowed time in milliseconds for a thread to invoke {@link #unregister(Matcher)}
* after {@link #register(Matcher)} has been invoked before this ThreadWatchDog starts to interrupting that thread.
*/
long maxExecutionTimeInMillis();

/**
* Unregisters the current thread and prevents it from being interrupted.
* Unregisters the current matcher and prevents it from being interrupted.
*
* @param matcher The matcher to unregister
*/
void unregister();
void unregister(Matcher matcher);

/**
* Returns an implementation that checks for each fixed interval if there are threads that have invoked {@link #register()}
* and not {@link #unregister()} and have been in this state for longer than the specified max execution interval and
* Returns an implementation that checks for each fixed interval if there are threads that have invoked {@link #register(Matcher)}
* and not {@link #unregister(Matcher)} and have been in this state for longer than the specified max execution interval and
* then interrupts these threads.
*
* @param interval The fixed interval to check if there are threads to interrupt
* @param maxExecutionTime The time a thread has the execute an operation.
* @param relativeTimeSupplier A supplier that returns relative time
* @param scheduler A scheduler that is able to execute a command for each fixed interval
*/
static ThreadWatchdog newInstance(long interval,
static MatcherWatchdog newInstance(long interval,
long maxExecutionTime,
LongSupplier relativeTimeSupplier,
BiConsumer<Long, Runnable> scheduler) {
return new Default(interval, maxExecutionTime, relativeTimeSupplier, scheduler);
}

/**
* @return A noop implementation that does not interrupt threads and is useful for testing and pre-defined grok expressions.
*/
static ThreadWatchdog noop() {
static MatcherWatchdog noop() {
return Noop.INSTANCE;
}
class Noop implements ThreadWatchdog {

class Noop implements MatcherWatchdog {

private static final Noop INSTANCE = new Noop();

private Noop() {
}

@Override
public void register() {
public void register(Matcher matcher) {
}

@Override
public long maxExecutionTimeInMillis() {
return Long.MAX_VALUE;
}

@Override
public void unregister() {
public void unregister(Matcher matcher) {
}
}
class Default implements ThreadWatchdog {

class Default implements MatcherWatchdog {

private final long interval;
private final long maxExecutionTime;
private final LongSupplier relativeTimeSupplier;
private final BiConsumer<Long, Runnable> scheduler;
private final AtomicInteger registered = new AtomicInteger(0);
private final AtomicBoolean running = new AtomicBoolean(false);
final ConcurrentHashMap<Thread, Long> registry = new ConcurrentHashMap<>();
final ConcurrentHashMap<Matcher, Long> registry = new ConcurrentHashMap<>();

private Default(long interval,
long maxExecutionTime,
LongSupplier relativeTimeSupplier,
Expand All @@ -118,30 +124,30 @@ private Default(long interval,
this.relativeTimeSupplier = relativeTimeSupplier;
this.scheduler = scheduler;
}
public void register() {

public void register(Matcher matcher) {
registered.getAndIncrement();
Long previousValue = registry.put(Thread.currentThread(), relativeTimeSupplier.getAsLong());
Long previousValue = registry.put(matcher, relativeTimeSupplier.getAsLong());
if (running.compareAndSet(false, true) == true) {
scheduler.accept(interval, this::interruptLongRunningExecutions);
}
assert previousValue == null;
}

@Override
public long maxExecutionTimeInMillis() {
return maxExecutionTime;
}
public void unregister() {
Long previousValue = registry.remove(Thread.currentThread());

public void unregister(Matcher matcher) {
Long previousValue = registry.remove(matcher);
registered.decrementAndGet();
assert previousValue != null;
}

private void interruptLongRunningExecutions() {
final long currentRelativeTime = relativeTimeSupplier.getAsLong();
for (Map.Entry<Thread, Long> entry : registry.entrySet()) {
for (Map.Entry<Matcher, Long> entry : registry.entrySet()) {
if ((currentRelativeTime - entry.getValue()) > maxExecutionTime) {
entry.getKey().interrupt();
// not removing the entry here, this happens in the unregister() method.
Expand All @@ -153,7 +159,7 @@ private void interruptLongRunningExecutions() {
running.set(false);
}
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -430,7 +430,7 @@ public void testExponentialExpressions() {
});
t.start();
};
Grok grok = new Grok(basePatterns, grokPattern, ThreadWatchdog.newInstance(10, 200, System::currentTimeMillis, scheduler));
Grok grok = new Grok(basePatterns, grokPattern, MatcherWatchdog.newInstance(10, 200, System::currentTimeMillis, scheduler));
Exception e = expectThrows(RuntimeException.class, () -> grok.captures(logLine));
run.set(false);
assertThat(e.getMessage(), equalTo("grok pattern matching was interrupted after [200] ms"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,22 +24,24 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.elasticsearch.test.ESTestCase;
import org.joni.Matcher;
import org.mockito.Mockito;

import static org.hamcrest.Matchers.is;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.verifyZeroInteractions;

public class ThreadWatchdogTests extends ESTestCase {
public class MatcherWatchdogTests extends ESTestCase {

public void testInterrupt() throws Exception {
AtomicBoolean run = new AtomicBoolean(true); // to avoid a lingering thread when test has completed
ThreadWatchdog watchdog = ThreadWatchdog.newInstance(10, 100, System::currentTimeMillis, (delay, command) -> {
MatcherWatchdog watchdog = MatcherWatchdog.newInstance(10, 100, System::currentTimeMillis, (delay, command) -> {
try {
Thread.sleep(delay);
} catch (InterruptedException e) {
Expand All @@ -53,17 +55,17 @@ public void testInterrupt() throws Exception {
thread.start();
});

Map<?, ?> registry = ((ThreadWatchdog.Default) watchdog).registry;
Map<?, ?> registry = ((MatcherWatchdog.Default) watchdog).registry;
assertThat(registry.size(), is(0));
// need to call #register() method on a different thread, assertBusy() fails if current thread gets interrupted
AtomicBoolean interrupted = new AtomicBoolean(false);
Thread thread = new Thread(() -> {
Thread currentThread = Thread.currentThread();
watchdog.register();
while (currentThread.isInterrupted() == false) {}
Matcher matcher = mock(Matcher.class);
watchdog.register(matcher);
verify(matcher, timeout(9999).times(1)).interrupt();
interrupted.set(true);
while (run.get()) {} // wait here so that the size of the registry can be asserted
watchdog.unregister();
watchdog.unregister(matcher);
});
thread.start();
assertBusy(() -> {
Expand All @@ -79,7 +81,7 @@ public void testInterrupt() throws Exception {
public void testIdleIfNothingRegistered() throws Exception {
long interval = 1L;
ScheduledExecutorService threadPool = mock(ScheduledExecutorService.class);
ThreadWatchdog watchdog = ThreadWatchdog.newInstance(interval, Long.MAX_VALUE, System::currentTimeMillis,
MatcherWatchdog watchdog = MatcherWatchdog.newInstance(interval, Long.MAX_VALUE, System::currentTimeMillis,
(delay, command) -> threadPool.schedule(command, delay, TimeUnit.MILLISECONDS));
// Periodic action is not scheduled because no thread is registered
verifyZeroInteractions(threadPool);
Expand All @@ -91,16 +93,20 @@ public void testIdleIfNothingRegistered() throws Exception {
}).when(threadPool).schedule(
any(Runnable.class), eq(interval), eq(TimeUnit.MILLISECONDS)
);
watchdog.register();
Matcher matcher = mock(Matcher.class);
watchdog.register(matcher);
// Registering the first thread should have caused the command to get scheduled again
Runnable command = commandFuture.get(1L, TimeUnit.MILLISECONDS);
Mockito.reset(threadPool);
watchdog.unregister();
watchdog.unregister(matcher);
command.run();
// Periodic action is not scheduled again because no thread is registered
verifyZeroInteractions(threadPool);
watchdog.register();
Thread otherThread = new Thread(watchdog::register);
watchdog.register(matcher);
Thread otherThread = new Thread(() -> {
Matcher otherMatcher = mock(Matcher.class);
watchdog.register(otherMatcher);
});
try {
verify(threadPool).schedule(any(Runnable.class), eq(interval), eq(TimeUnit.MILLISECONDS));
// Registering a second thread does not cause the command to get scheduled twice
Expand Down
Loading