Skip to content

Commit

Permalink
Merge branch '__rultor'
Browse files Browse the repository at this point in the history
  • Loading branch information
rultor committed Aug 31, 2018
2 parents 4a269a3 + 7d15029 commit 6ce8160
Showing 1 changed file with 47 additions and 36 deletions.
83 changes: 47 additions & 36 deletions src/main/java/io/wring/agents/Routine.java
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,12 @@ public final class Routine implements Callable<Integer>, AutoCloseable {
/**
* Executor.
*/
private final transient ScheduledExecutorService executor;
private final transient ScheduledExecutorService ticker;

/**
* Executor.
*/
private final transient ExecutorService executor;

/**
* Ctor.
Expand All @@ -97,17 +102,27 @@ public Routine(final Base bse) {
public Routine(final Base bse, final int total) {
this.base = bse;
this.threads = total;
this.executor = Executors.newSingleThreadScheduledExecutor(
this.ticker = Executors.newSingleThreadScheduledExecutor(
new VerboseThreads(Routine.class)
);
this.executor = Executors.newFixedThreadPool(
this.threads,
new VerboseThreads(
String.format(
"Routine-%04x",
// @checkstyle MagicNumber (1 line)
System.currentTimeMillis() % 0xffffL
)
)
);
}

/**
* Start it.
*/
public void start() {
Sentry.init(Manifests.read("Wring-SentryDsn"));
this.executor.scheduleWithFixedDelay(
this.ticker.scheduleWithFixedDelay(
new VerboseRunnable(
() -> {
try {
Expand All @@ -128,50 +143,45 @@ public void start() {
@Override
@SuppressWarnings("PMD.PrematureDeclaration")
public Integer call() throws InterruptedException {
// @checkstyle MagicNumber (1 line)
if (Thread.getAllStackTraces().size() > 64) {
throw new IllegalStateException(
String.format(
"Too many threads already, can't start: %s",
String.join(
"; ",
new Mapped<>(
thread -> String.format(
"%s/%s/%B/%B",
thread.getName(),
thread.getState(),
thread.isAlive(),
thread.isInterrupted()
),
Thread.getAllStackTraces().keySet()
Thread.getAllStackTraces().forEach(
(thread, stack) -> {
if (thread.getName().contains("Routine-")
&& thread.isInterrupted()) {
Logger.info(
this,
String.format(
"Interrupted thread, cleaning %s/%s/%B/%B",
thread.getName(),
thread.getState(),
thread.isAlive(),
thread.isInterrupted()
)
)
)
);
}
);
try {
thread.join();
} catch (final InterruptedException err) {
Logger.info(
this,
String.format(
"Cleared thread %s",
thread.getName()
)
);
}
}
}
);
final long start = System.currentTimeMillis();
final Collection<Future<?>> futures = new ArrayList<>(this.threads);
final ExecutorService runner = Executors.newFixedThreadPool(
this.threads,
new VerboseThreads(
String.format(
"Routine-%04x",
// @checkstyle MagicNumber (1 line)
System.currentTimeMillis() % 0xffffL
)
)
);
for (final Pipe pipe : this.base.pipes()) {
futures.add(runner.submit(this.job(pipe)));
futures.add(this.executor.submit(this.job(pipe)));
}
try {
for (final Future<?> future : futures) {
future.get(Routine.LAG, TimeUnit.MINUTES);
}
} catch (final ExecutionException | TimeoutException ex) {
throw new IllegalStateException(ex);
} finally {
Routine.close(runner);
}
Logger.info(
this, "%d pipes processed in %[ms]s, threads=%d: %s",
Expand All @@ -190,6 +200,7 @@ public Integer call() throws InterruptedException {
@Override
public void close() {
try {
Routine.close(this.ticker);
Routine.close(this.executor);
} catch (final InterruptedException ex) {
Sentry.capture(ex);
Expand Down

0 comments on commit 6ce8160

Please sign in to comment.