diff --git a/pom.xml b/pom.xml index 5aaceb77..49e24e97 100644 --- a/pom.xml +++ b/pom.xml @@ -28,7 +28,7 @@ org.jenkins-ci.plugins plugin - 3.21 + 3.25 org.jenkins-ci.plugins.workflow @@ -64,22 +64,22 @@ 2.31 -SNAPSHOT - 2.121 + 2.121.1 8 false - 2.14 + 2.21 true org.jenkins-ci.plugins.workflow workflow-step-api - 2.10 + 2.16 org.jenkins-ci.plugins scm-api - 2.0.8 + 2.2.6 org.jenkins-ci.plugins.workflow @@ -90,13 +90,13 @@ org.jenkins-ci.plugins.workflow workflow-job - 2.11.1 + 2.26 test org.jenkins-ci.plugins.workflow workflow-cps - 2.33 + 2.58 test @@ -121,13 +121,13 @@ org.jenkins-ci.plugins.workflow workflow-durable-task-step - 2.8 + 2.22 test org.jenkins-ci.plugins script-security - 1.27 + 1.46 test @@ -139,7 +139,7 @@ org.jenkins-ci.plugins structs - 1.14 + 1.17 org.jenkins-ci.test diff --git a/src/main/java/org/jenkinsci/plugins/workflow/log/BufferedBuildListener.java b/src/main/java/org/jenkinsci/plugins/workflow/log/BufferedBuildListener.java new file mode 100644 index 00000000..c0ed079d --- /dev/null +++ b/src/main/java/org/jenkinsci/plugins/workflow/log/BufferedBuildListener.java @@ -0,0 +1,82 @@ +/* + * The MIT License + * + * Copyright 2018 CloudBees, Inc. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ + +package org.jenkinsci.plugins.workflow.log; + +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; +import hudson.CloseProofOutputStream; +import hudson.model.BuildListener; +import hudson.remoting.RemoteOutputStream; +import hudson.util.StreamTaskListener; +import java.io.Closeable; +import java.io.IOException; +import java.io.OutputStream; +import java.io.PrintStream; +import org.jenkinsci.remoting.SerializableOnlyOverRemoting; + +/** + * Unlike {@link StreamTaskListener} this does not set {@code autoflush} on the reconstructed {@link PrintStream}. + * It also wraps on the remote side in {@link DelayBufferedOutputStream}. + */ +final class BufferedBuildListener implements BuildListener, Closeable, SerializableOnlyOverRemoting { + + private final OutputStream out; + @SuppressFBWarnings(value = "SE_BAD_FIELD", justification = "using Replacement anyway, fields here are irrelevant") + private final PrintStream ps; + + BufferedBuildListener(OutputStream out) throws IOException { + this.out = out; + ps = new PrintStream(out, false, "UTF-8"); + } + + @Override public PrintStream getLogger() { + return ps; + } + + @Override public void close() throws IOException { + ps.close(); + } + + private Object writeReplace() { + return new Replacement(this); + } + + private static final class Replacement implements SerializableOnlyOverRemoting { + + private static final long serialVersionUID = 1; + + private final RemoteOutputStream ros; + private final DelayBufferedOutputStream.Tuning tuning = DelayBufferedOutputStream.Tuning.DEFAULT; // load defaults on master + + Replacement(BufferedBuildListener cbl) { + this.ros = new RemoteOutputStream(new CloseProofOutputStream(cbl.out)); + } + + private Object readResolve() throws IOException { + return new BufferedBuildListener(new DelayBufferedOutputStream(ros, tuning)); + } + + } + +} diff --git a/src/main/java/org/jenkinsci/plugins/workflow/log/DelayBufferedOutputStream.java b/src/main/java/org/jenkinsci/plugins/workflow/log/DelayBufferedOutputStream.java new file mode 100644 index 00000000..2fd4c949 --- /dev/null +++ b/src/main/java/org/jenkinsci/plugins/workflow/log/DelayBufferedOutputStream.java @@ -0,0 +1,148 @@ +/* + * The MIT License + * + * Copyright 2018 CloudBees, Inc. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ + +package org.jenkinsci.plugins.workflow.log; + +import java.io.BufferedOutputStream; +import java.io.FilterOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.lang.ref.Reference; +import java.lang.ref.WeakReference; +import java.util.concurrent.TimeUnit; +import java.util.logging.Level; +import java.util.logging.Logger; +import jenkins.util.Timer; +import org.jenkinsci.remoting.SerializableOnlyOverRemoting; + +/** + * Buffered output stream which is guaranteed to deliver content after some time even if idle and the buffer does not fill up. + * The automatic “flushing” does not flush the underlying stream, for example via {@code ProxyOutputStream.Flush}. + */ +final class DelayBufferedOutputStream extends BufferedOutputStream { + + private static final Logger LOGGER = Logger.getLogger(DelayBufferedOutputStream.class.getName()); + + static final class Tuning implements SerializableOnlyOverRemoting { + private Tuning() {} + // nonfinal for Groovy scripting: + long minRecurrencePeriod = Long.getLong(DelayBufferedOutputStream.class.getName() + ".minRecurrencePeriod", 1_000); // 1s + long maxRecurrencePeriod = Long.getLong(DelayBufferedOutputStream.class.getName() + ".maxRecurrencePeriod", 10_000); // 10s + float recurrencePeriodBackoff = Float.parseFloat(System.getProperty(DelayBufferedOutputStream.class.getName() + ".recurrencePeriodBackoff", "1.05")); + int bufferSize = Integer.getInteger(DelayBufferedOutputStream.class.getName() + ".bufferSize", 1 << 16); // 64Kib + static final Tuning DEFAULT = new Tuning(); + } + + private final Tuning tuning; + private long recurrencePeriod; + + DelayBufferedOutputStream(OutputStream out) { + this(out, Tuning.DEFAULT); + } + + DelayBufferedOutputStream(OutputStream out, Tuning tuning) { + super(new FlushControlledOutputStream(out), tuning.bufferSize); + this.tuning = tuning; + recurrencePeriod = tuning.minRecurrencePeriod; + reschedule(); + } + + private void reschedule() { + Timer.get().schedule(new Flush(this), recurrencePeriod, TimeUnit.MILLISECONDS); + recurrencePeriod = Math.min((long) (recurrencePeriod * tuning.recurrencePeriodBackoff), tuning.maxRecurrencePeriod); + } + + /** We can only call {@link BufferedOutputStream#flushBuffer} via {@link #flush}, but we do not wish to flush the underlying stream, only write out the buffer. */ + private void flushBuffer() throws IOException { + ThreadLocal enableFlush = ((FlushControlledOutputStream) out).enableFlush; + boolean orig = enableFlush.get(); + enableFlush.set(false); + try { + flush(); + } finally { + enableFlush.set(orig); + } + } + + void flushAndReschedule() { + // TODO as an optimization, avoid flushing the buffer if it was recently flushed anyway due to filling up + try { + flushBuffer(); + } catch (IOException x) { + LOGGER.log(Level.FINE, null, x); + } + reschedule(); + } + + @SuppressWarnings("FinalizeDeclaration") // not ideal, but PhantomReference is more of a hassle + @Override protected void finalize() throws Throwable { + super.finalize(); + // Odd that this is not the default behavior for BufferedOutputStream. + flush(); + } + + private static final class Flush implements Runnable { + + /** Since there is no explicit close event, just keep flushing periodically until the stream is collected. */ + private final Reference osr; + + Flush(DelayBufferedOutputStream os) { + osr = new WeakReference<>(os); + } + + @Override public void run() { + DelayBufferedOutputStream os = osr.get(); + if (os != null) { + os.flushAndReschedule(); + } + } + + } + + /** @see DelayBufferedOutputStream#flushBuffer */ + private static final class FlushControlledOutputStream extends FilterOutputStream { + + private final ThreadLocal enableFlush = new ThreadLocal() { + @Override protected Boolean initialValue() { + return true; + } + }; + + FlushControlledOutputStream(OutputStream out) { + super(out); + } + + @Override public void write(byte[] b, int off, int len) throws IOException { + out.write(b, off, len); // super method writes one byte at a time! + } + + @Override public void flush() throws IOException { + if (enableFlush.get()) { + super.flush(); + } + } + + } + +} diff --git a/src/main/java/org/jenkinsci/plugins/workflow/log/FileLogStorage.java b/src/main/java/org/jenkinsci/plugins/workflow/log/FileLogStorage.java index ba6854fc..d19c7f34 100644 --- a/src/main/java/org/jenkinsci/plugins/workflow/log/FileLogStorage.java +++ b/src/main/java/org/jenkinsci/plugins/workflow/log/FileLogStorage.java @@ -28,9 +28,7 @@ import hudson.console.AnnotatedLargeText; import hudson.console.ConsoleAnnotationOutputStream; import hudson.model.BuildListener; -import hudson.model.StreamBuildListener; import hudson.model.TaskListener; -import hudson.util.StreamTaskListener; import java.io.BufferedReader; import java.io.File; import java.io.FileOutputStream; @@ -46,6 +44,7 @@ import java.util.HashMap; import java.util.Map; import java.util.Objects; +import java.util.logging.Level; import java.util.logging.Logger; import org.apache.commons.io.input.NullReader; import org.jenkinsci.plugins.workflow.flow.FlowExecutionOwner; @@ -71,8 +70,10 @@ public static synchronized LogStorage forFile(File log) { private final File log; private final File index; - @SuppressFBWarnings(value = "IS2_INCONSISTENT_SYNC", justification = "FB apparently gets confused by what the lock is, and anyway we only care about synchronizing writes") + @SuppressFBWarnings(value = "IS2_INCONSISTENT_SYNC", justification = "actually it is always accessed within the monitor") private FileOutputStream os; + @SuppressFBWarnings(value = "IS2_INCONSISTENT_SYNC", justification = "we only care about synchronizing writes") + private OutputStream bos; private Writer indexOs; private String lastId; @@ -84,6 +85,7 @@ private FileLogStorage(File log) { private synchronized void open() throws IOException { if (os == null) { os = new FileOutputStream(log, true); + bos = new DelayBufferedOutputStream(os); if (index.isFile()) { try (BufferedReader r = Files.newBufferedReader(index.toPath(), StandardCharsets.UTF_8)) { // TODO would be faster to scan the file backwards for the penultimate \n, then convert the byte sequence from there to EOF to UTF-8 and set lastId accordingly @@ -110,16 +112,17 @@ private synchronized void open() throws IOException { } @Override public BuildListener overallListener() throws IOException, InterruptedException { - return new StreamBuildListener(new IndexOutputStream(null), StandardCharsets.UTF_8); + return new BufferedBuildListener(new IndexOutputStream(null)); } @Override public TaskListener nodeListener(FlowNode node) throws IOException, InterruptedException { - return new StreamTaskListener(new IndexOutputStream(node.getId()), StandardCharsets.UTF_8); + return new BufferedBuildListener(new IndexOutputStream(node.getId())); } private void checkId(String id) throws IOException { assert Thread.holdsLock(this); if (!Objects.equals(id, lastId)) { + bos.flush(); long pos = os.getChannel().position(); if (id == null) { indexOs.write(pos + "\n"); @@ -146,33 +149,33 @@ private final class IndexOutputStream extends OutputStream { @Override public void write(int b) throws IOException { synchronized (FileLogStorage.this) { checkId(id); - os.write(b); + bos.write(b); } } @Override public void write(byte[] b) throws IOException { synchronized (FileLogStorage.this) { checkId(id); - os.write(b); + bos.write(b); } } @Override public void write(byte[] b, int off, int len) throws IOException { synchronized (FileLogStorage.this) { checkId(id); - os.write(b, off, len); + bos.write(b, off, len); } } @Override public void flush() throws IOException { - os.flush(); + bos.flush(); } @Override public void close() throws IOException { if (id == null) { openStorages.remove(log); try { - os.close(); + bos.close(); } finally { indexOs.close(); } @@ -181,7 +184,18 @@ private final class IndexOutputStream extends OutputStream { } + private void maybeFlush() { + if (bos != null) { + try { + bos.flush(); + } catch (IOException x) { + LOGGER.log(Level.WARNING, "failed to flush " + log, x); + } + } + } + @Override public AnnotatedLargeText overallLog(FlowExecutionOwner.Executable build, boolean complete) { + maybeFlush(); return new AnnotatedLargeText(log, StandardCharsets.UTF_8, complete, build) { @Override public long writeHtmlTo(long start, Writer w) throws IOException { try (BufferedReader indexBR = index.isFile() ? Files.newBufferedReader(index.toPath(), StandardCharsets.UTF_8) : new BufferedReader(new NullReader(0))) { @@ -239,6 +253,7 @@ private final class IndexOutputStream extends OutputStream { } @Override public AnnotatedLargeText stepLog(FlowNode node, boolean complete) { + maybeFlush(); String id = node.getId(); try (ByteBuffer buf = new ByteBuffer(); RandomAccessFile raf = new RandomAccessFile(log, "r"); diff --git a/src/main/java/org/jenkinsci/plugins/workflow/log/TaskListenerDecorator.java b/src/main/java/org/jenkinsci/plugins/workflow/log/TaskListenerDecorator.java index a2e3fd73..023a9998 100644 --- a/src/main/java/org/jenkinsci/plugins/workflow/log/TaskListenerDecorator.java +++ b/src/main/java/org/jenkinsci/plugins/workflow/log/TaskListenerDecorator.java @@ -158,10 +158,10 @@ public static BuildListener apply(@Nonnull TaskListener listener, @Nonnull FlowE filter(Objects::nonNull). collect(Collectors.toCollection(ArrayList::new)); if (decorators.isEmpty()) { - return BuildListenerAdapter.wrap(listener); + return CloseableTaskListener.of(BuildListenerAdapter.wrap(listener), listener); } else { Collections.reverse(decorators); - return new DecoratedTaskListener(listener, decorators); + return CloseableTaskListener.of(new DecoratedTaskListener(listener, decorators), listener); } } @@ -261,4 +261,39 @@ private static final class DecoratedTaskListener implements BuildListener { } + private static final class CloseableTaskListener implements BuildListener, AutoCloseable { + + static BuildListener of(BuildListener mainDelegate, TaskListener closeDelegate) { + if (closeDelegate instanceof AutoCloseable) { + return new CloseableTaskListener(mainDelegate, closeDelegate); + } else { + return mainDelegate; + } + } + + private static final long serialVersionUID = 1; + + private final @Nonnull TaskListener mainDelegate; + private final @Nonnull TaskListener closeDelegate; + + private CloseableTaskListener(TaskListener mainDelegate, TaskListener closeDelegate) { + this.mainDelegate = mainDelegate; + this.closeDelegate = closeDelegate; + assert closeDelegate instanceof AutoCloseable; + } + + @Override public PrintStream getLogger() { + return mainDelegate.getLogger(); + } + + @Override public void close() throws Exception { + ((AutoCloseable) closeDelegate).close(); + } + + @Override public String toString() { + return "CloseableTaskListener[" + mainDelegate + " / " + closeDelegate + "]"; + } + + } + } diff --git a/src/test/java/org/jenkinsci/plugins/workflow/log/LogStorageTestBase.java b/src/test/java/org/jenkinsci/plugins/workflow/log/LogStorageTestBase.java index 37139386..c69d215b 100644 --- a/src/test/java/org/jenkinsci/plugins/workflow/log/LogStorageTestBase.java +++ b/src/test/java/org/jenkinsci/plugins/workflow/log/LogStorageTestBase.java @@ -37,9 +37,7 @@ import java.util.Random; import java.util.concurrent.Callable; import java.util.function.BiFunction; -import java.util.logging.Logger; import jenkins.security.MasterToSlaveCallable; -import org.apache.commons.io.IOUtils; import org.apache.commons.io.output.NullOutputStream; import org.apache.commons.io.output.NullWriter; import org.apache.commons.io.output.WriterOutputStream; @@ -91,6 +89,7 @@ public abstract class LogStorageTestBase { step1.getLogger().println("one #3"); overall.getLogger().println("pausing"); overallHtmlPos = assertOverallLog(overallHtmlPos, "one #2\none #3\npausing\n", true); + // TODO if we produce output from the middle of a step, we need new span blocks step1Pos = assertStepLog("1", step1Pos, "one #2\none #3\n", true); assertLength("1", step1Pos); try { // as above @@ -149,11 +148,6 @@ protected static void close(TaskListener listener) throws Exception { VirtualChannel channel = r.createOnlineSlave().getChannel(); channel.call(new RemotePrint("overall from agent", overall)); channel.call(new RemotePrint("step from agent", step)); - while (!IOUtils.toString(text().readAll()).contains("overall from agent") || !IOUtils.toString(text().readAll()).contains("step from agent")) { - // TODO current cloud implementations may be unable to honor the completed flag on remotely printed messages, pending some way to have all affected loggers confirm they have flushed - Logger.getLogger(LogStorageTestBase.class.getName()).info("waiting for remote content to appear"); - Thread.sleep(1000); - } overallPos = assertOverallLog(overallPos, "overall from agent\nstep from agent\n", true); stepPos = assertStepLog("1", stepPos, "step from agent\n", true); assertEquals(overallPos, assertOverallLog(overallPos, "", true)); @@ -171,6 +165,7 @@ private static final class RemotePrint extends MasterToSlaveCallable