Skip to content

Commit

Permalink
Simplify SliceExecutor and QueueSizeBasedExecutor (#12285)
Browse files Browse the repository at this point in the history
The only behaviour that QueueSizeBasedExecutor overrides from SliceExecutor is when to execute on the caller thread. There is no need to override the whole invokeAll method for that. Instead, this commit introduces a shouldExecuteOnCallerThread method that can be overridden.
  • Loading branch information
javanna committed May 11, 2023
1 parent 37b97a6 commit 784830a
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 71 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.lucene.search;

import java.util.Collection;
import java.util.concurrent.ThreadPoolExecutor;

/**
Expand All @@ -30,31 +29,15 @@ class QueueSizeBasedExecutor extends SliceExecutor {

private final ThreadPoolExecutor threadPoolExecutor;

public QueueSizeBasedExecutor(ThreadPoolExecutor threadPoolExecutor) {
QueueSizeBasedExecutor(ThreadPoolExecutor threadPoolExecutor) {
super(threadPoolExecutor);
this.threadPoolExecutor = threadPoolExecutor;
}

@Override
public void invokeAll(Collection<? extends Runnable> tasks) {
int i = 0;

for (Runnable task : tasks) {
boolean shouldExecuteOnCallerThread = false;

// Execute last task on caller thread
if (i == tasks.size() - 1) {
shouldExecuteOnCallerThread = true;
}

if (threadPoolExecutor.getQueue().size()
>= (threadPoolExecutor.getMaximumPoolSize() * LIMITING_FACTOR)) {
shouldExecuteOnCallerThread = true;
}

processTask(task, shouldExecuteOnCallerThread);

++i;
}
boolean shouldExecuteOnCallerThread(int index, int numTasks) {
return super.shouldExecuteOnCallerThread(index, numTasks)
|| threadPoolExecutor.getQueue().size()
>= (threadPoolExecutor.getMaximumPoolSize() * LIMITING_FACTOR);
}
}
57 changes: 17 additions & 40 deletions lucene/core/src/java/org/apache/lucene/search/SliceExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.lucene.search;

import java.util.Collection;
import java.util.Objects;
import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException;

Expand All @@ -28,54 +29,30 @@
class SliceExecutor {
private final Executor executor;

public SliceExecutor(Executor executor) {
this.executor = executor;
SliceExecutor(Executor executor) {
this.executor = Objects.requireNonNull(executor, "Executor is null");
}

public void invokeAll(Collection<? extends Runnable> tasks) {

if (tasks == null) {
throw new IllegalArgumentException("Tasks is null");
}

if (executor == null) {
throw new IllegalArgumentException("Executor is null");
}

final void invokeAll(Collection<? extends Runnable> tasks) {
int i = 0;

for (Runnable task : tasks) {
boolean shouldExecuteOnCallerThread = false;

// Execute last task on caller thread
if (i == tasks.size() - 1) {
shouldExecuteOnCallerThread = true;
if (shouldExecuteOnCallerThread(i, tasks.size())) {
task.run();
} else {
try {
executor.execute(task);
} catch (
@SuppressWarnings("unused")
RejectedExecutionException e) {
task.run();
}
}

processTask(task, shouldExecuteOnCallerThread);
++i;
}
;
}

// Helper method to execute a single task
protected void processTask(final Runnable task, final boolean shouldExecuteOnCallerThread) {
if (task == null) {
throw new IllegalArgumentException("Input is null");
}

if (!shouldExecuteOnCallerThread) {
try {
executor.execute(task);

return;
} catch (
@SuppressWarnings("unused")
RejectedExecutionException e) {
// Execute on caller thread
}
}

task.run();
boolean shouldExecuteOnCallerThread(int index, int numTasks) {
// Execute last task on caller thread
return index == numTasks - 1;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -453,20 +453,15 @@ private void runSliceExecutorTest(ThreadPoolExecutor service, boolean useRandomS
}
}

private class RandomBlockingSliceExecutor extends SliceExecutor {
private static class RandomBlockingSliceExecutor extends SliceExecutor {

public RandomBlockingSliceExecutor(Executor executor) {
RandomBlockingSliceExecutor(Executor executor) {
super(executor);
}

@Override
public void invokeAll(Collection<? extends Runnable> tasks) {

for (Runnable task : tasks) {
boolean shouldExecuteOnCallerThread = random().nextBoolean();

processTask(task, shouldExecuteOnCallerThread);
}
boolean shouldExecuteOnCallerThread(int index, int numTasks) {
return random().nextBoolean();
}
}
}

0 comments on commit 784830a

Please sign in to comment.