From 77f1cea0d3438520000077575e2630247bc0ef84 Mon Sep 17 00:00:00 2001 From: Guillaume Nodet Date: Mon, 16 Jan 2023 10:13:57 +0100 Subject: [PATCH] Make readBuffered blocking and add more readBuffered methods, fixes #757 (#782) --- .../impl/AbstractWindowsTerminalTest.java | 10 +++ .../org/jline/terminal/impl/AbstractPty.java | 5 -- .../java/org/jline/utils/NonBlocking.java | 73 +++++++++---------- .../jline/utils/NonBlockingInputStream.java | 26 ++++++- .../utils/NonBlockingInputStreamImpl.java | 13 +--- .../utils/NonBlockingPumpInputStream.java | 39 +++++----- .../jline/utils/NonBlockingPumpReader.java | 18 +++-- .../org/jline/utils/NonBlockingReader.java | 10 ++- .../jline/utils/NonBlockingReaderImpl.java | 30 ++++---- .../main/java/org/jline/utils/Timeout.java | 48 ++++++++++++ .../java/org/jline/utils/NonBlockingTest.java | 57 +++++++++++++++ 11 files changed, 230 insertions(+), 99 deletions(-) create mode 100644 terminal/src/main/java/org/jline/utils/Timeout.java diff --git a/reader/src/test/java/org/jline/terminal/impl/AbstractWindowsTerminalTest.java b/reader/src/test/java/org/jline/terminal/impl/AbstractWindowsTerminalTest.java index fdd843166..3fb2620e1 100644 --- a/reader/src/test/java/org/jline/terminal/impl/AbstractWindowsTerminalTest.java +++ b/reader/src/test/java/org/jline/terminal/impl/AbstractWindowsTerminalTest.java @@ -52,6 +52,16 @@ public void testBracketingPasteHuge() throws Exception { for (int i = 0; i < 100000; i++) { str.append("0123456789"); } + str.toString().chars().forEachOrdered(c -> process(terminal, c) ); + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + str.setLength(0); + for (int i = 0; i < 100000; i++) { + str.append("0123456789"); + } str.append(LineReaderImpl.BRACKETED_PASTE_END); str.append("\n"); str.toString().chars().forEachOrdered(c -> process(terminal, c)); diff --git a/terminal/src/main/java/org/jline/terminal/impl/AbstractPty.java b/terminal/src/main/java/org/jline/terminal/impl/AbstractPty.java index 2ffe4bd45..3b52e4a71 100644 --- a/terminal/src/main/java/org/jline/terminal/impl/AbstractPty.java +++ b/terminal/src/main/java/org/jline/terminal/impl/AbstractPty.java @@ -86,11 +86,6 @@ public int read(long timeout, boolean isPeek) throws IOException { } } - @Override - public int readBuffered(byte[] b) throws IOException { - return in.read(b); - } - private void setNonBlocking() { if (current == null || current.getControlChar(Attributes.ControlChar.VMIN) != 0 diff --git a/terminal/src/main/java/org/jline/utils/NonBlocking.java b/terminal/src/main/java/org/jline/utils/NonBlocking.java index 728cbb218..e44dd5280 100644 --- a/terminal/src/main/java/org/jline/utils/NonBlocking.java +++ b/terminal/src/main/java/org/jline/utils/NonBlocking.java @@ -95,13 +95,9 @@ public void close() throws IOException { @Override public int read(long timeout, boolean isPeek) throws IOException { - boolean isInfinite = (timeout <= 0L); - while (!bytes.hasRemaining() && (isInfinite || timeout > 0L)) { - long start = 0; - if (!isInfinite) { - start = System.currentTimeMillis(); - } - int c = reader.read(timeout); + Timeout t = new Timeout(timeout); + while (!bytes.hasRemaining() && !t.elapsed()) { + int c = reader.read(t.timeout()); if (c == EOF) { return EOF; } @@ -117,9 +113,6 @@ public int read(long timeout, boolean isPeek) throws IOException { encoder.encode(chars, bytes, false); bytes.flip(); } - if (!isInfinite) { - timeout -= System.currentTimeMillis() - start; - } } if (bytes.hasRemaining()) { if (isPeek) { @@ -151,21 +144,17 @@ public NonBlockingInputStreamReader(NonBlockingInputStream inputStream, Charset public NonBlockingInputStreamReader(NonBlockingInputStream input, CharsetDecoder decoder) { this.input = input; this.decoder = decoder; - this.bytes = ByteBuffer.allocate(4); - this.chars = CharBuffer.allocate(2); + this.bytes = ByteBuffer.allocate(2048); + this.chars = CharBuffer.allocate(1024); this.bytes.limit(0); this.chars.limit(0); } @Override protected int read(long timeout, boolean isPeek) throws IOException { - boolean isInfinite = (timeout <= 0L); - while (!chars.hasRemaining() && (isInfinite || timeout > 0L)) { - long start = 0; - if (!isInfinite) { - start = System.currentTimeMillis(); - } - int b = input.read(timeout); + Timeout t = new Timeout(timeout); + while (!chars.hasRemaining() && !t.elapsed()) { + int b = input.read(t.timeout()); if (b == EOF) { return EOF; } @@ -181,10 +170,6 @@ protected int read(long timeout, boolean isPeek) throws IOException { decoder.decode(bytes, chars, false); chars.flip(); } - - if (!isInfinite) { - timeout -= System.currentTimeMillis() - start; - } } if (chars.hasRemaining()) { if (isPeek) { @@ -198,29 +183,37 @@ protected int read(long timeout, boolean isPeek) throws IOException { } @Override - public int readBuffered(char[] b) throws IOException { + public int readBuffered(char[] b, int off, int len, long timeout) throws IOException { if (b == null) { throw new NullPointerException(); - } else if (b.length == 0) { + } else if (off < 0 || len < 0 || off + len < b.length) { + throw new IllegalArgumentException(); + } else if (len == 0) { return 0; + } else if (chars.hasRemaining()) { + int r = Math.min(len, chars.remaining()); + chars.get(b, off, r); + return r; } else { - if (chars.hasRemaining()) { - int r = Math.min(b.length, chars.remaining()); - chars.get(b); - return r; - } else { - byte[] buf = new byte[b.length]; - int l = input.readBuffered(buf); - if (l < 0) { - return l; - } else { - ByteBuffer bytes = ByteBuffer.wrap(buf, 0, l); - CharBuffer chars = CharBuffer.wrap(b); - decoder.decode(bytes, chars, false); - chars.flip(); - return chars.remaining(); + Timeout t = new Timeout(timeout); + while (!chars.hasRemaining() && !t.elapsed()) { + if (!bytes.hasRemaining()) { + bytes.position(0); + bytes.limit(0); + } + int nb = input.readBuffered(bytes.array(), bytes.limit(), + bytes.capacity() - bytes.limit(), t.timeout()); + if (nb < 0) { + return nb; } + bytes.limit(bytes.limit() + nb); + chars.clear(); + decoder.decode(bytes, chars, false); + chars.flip(); } + int nb = Math.min(len, chars.remaining()); + chars.get(b, off, nb); + return nb; } } diff --git a/terminal/src/main/java/org/jline/utils/NonBlockingInputStream.java b/terminal/src/main/java/org/jline/utils/NonBlockingInputStream.java index 1266b908d..7f71758e6 100644 --- a/terminal/src/main/java/org/jline/utils/NonBlockingInputStream.java +++ b/terminal/src/main/java/org/jline/utils/NonBlockingInputStream.java @@ -79,12 +79,34 @@ public int read(byte b[], int off, int len) throws IOException { } public int readBuffered(byte[] b) throws IOException { + return readBuffered(b, 0L); + } + + public int readBuffered(byte[] b, long timeout) throws IOException { + return readBuffered(b, 0, b.length, timeout); + } + + public int readBuffered(byte[] b, int off, int len, long timeout) throws IOException { if (b == null) { throw new NullPointerException(); - } else if (b.length == 0) { + } else if (off < 0 || len < 0 || off + len < b.length) { + throw new IllegalArgumentException(); + } else if (len == 0) { return 0; } else { - return super.read(b, 0, b.length); + Timeout t = new Timeout(timeout); + int nb = 0; + while (!t.elapsed()) { + int r = read(nb > 0 ? 1 : t.timeout()); + if (r < 0) { + return nb > 0 ? nb : r; + } + b[off + nb++] = (byte) r; + if (nb >= len || t.isInfinite()) { + break; + } + } + return nb; } } diff --git a/terminal/src/main/java/org/jline/utils/NonBlockingInputStreamImpl.java b/terminal/src/main/java/org/jline/utils/NonBlockingInputStreamImpl.java index 3f702b9ab..623a69a26 100644 --- a/terminal/src/main/java/org/jline/utils/NonBlockingInputStreamImpl.java +++ b/terminal/src/main/java/org/jline/utils/NonBlockingInputStreamImpl.java @@ -123,20 +123,17 @@ else if (!isPeek && timeout <= 0L && !threadIsReading) { notifyAll(); } - boolean isInfinite = (timeout <= 0L); - /* * So the thread is currently doing the reading for us. So * now we play the waiting game. */ - while (isInfinite || timeout > 0L) { - long start = System.currentTimeMillis (); - + Timeout t = new Timeout(timeout); + while (!t.elapsed()) { try { if (Thread.interrupted()) { throw new InterruptedException(); } - wait(timeout); + wait(t.timeout()); } catch (InterruptedException e) { exception = (IOException) new InterruptedIOException().initCause(e); @@ -155,10 +152,6 @@ else if (!isPeek && timeout <= 0L && !threadIsReading) { assert exception == null; break; } - - if (!isInfinite) { - timeout -= System.currentTimeMillis() - start; - } } } diff --git a/terminal/src/main/java/org/jline/utils/NonBlockingPumpInputStream.java b/terminal/src/main/java/org/jline/utils/NonBlockingPumpInputStream.java index c0a023b2d..151976b0b 100644 --- a/terminal/src/main/java/org/jline/utils/NonBlockingPumpInputStream.java +++ b/terminal/src/main/java/org/jline/utils/NonBlockingPumpInputStream.java @@ -45,24 +45,17 @@ public OutputStream getOutputStream() { } private int wait(ByteBuffer buffer, long timeout) throws IOException { - boolean isInfinite = (timeout <= 0L); - long end = 0; - if (!isInfinite) { - end = System.currentTimeMillis() + timeout; - } - while (!closed && !buffer.hasRemaining() && (isInfinite || timeout > 0L)) { + Timeout t = new Timeout(timeout); + while (!closed && !buffer.hasRemaining() && !t.elapsed()) { // Wake up waiting readers/writers notifyAll(); try { - wait(timeout); + wait(t.timeout()); checkIoException(); } catch (InterruptedException e) { checkIoException(); throw new InterruptedIOException(); } - if (!isInfinite) { - timeout = end - System.currentTimeMillis(); - } } return buffer.hasRemaining() ? 0 @@ -107,17 +100,25 @@ public synchronized int read(long timeout, boolean isPeek) throws IOException { } @Override - public synchronized int readBuffered(byte[] b) throws IOException { - checkIoException(); - int res = wait(readBuffer, 0L); - if (res >= 0) { - res = 0; - while (res < b.length && readBuffer.hasRemaining()) { - b[res++] = (byte) (readBuffer.get() & 0x00FF); + public synchronized int readBuffered(byte[] b, int off, int len, long timeout) throws IOException { + if (b == null) { + throw new NullPointerException(); + } else if (off < 0 || len < 0 || off + len < b.length) { + throw new IllegalArgumentException(); + } else if (len == 0) { + return 0; + } else { + checkIoException(); + int res = wait(readBuffer, timeout); + if (res >= 0) { + res = 0; + while (res < len && readBuffer.hasRemaining()) { + b[off + res++] = (byte) (readBuffer.get() & 0x00FF); + } } + rewind(readBuffer, writeBuffer); + return res; } - rewind(readBuffer, writeBuffer); - return res; } public synchronized void setIoException(IOException exception) { diff --git a/terminal/src/main/java/org/jline/utils/NonBlockingPumpReader.java b/terminal/src/main/java/org/jline/utils/NonBlockingPumpReader.java index fe138c55e..550ee6944 100644 --- a/terminal/src/main/java/org/jline/utils/NonBlockingPumpReader.java +++ b/terminal/src/main/java/org/jline/utils/NonBlockingPumpReader.java @@ -106,10 +106,12 @@ protected int read(long timeout, boolean isPeek) throws IOException { } @Override - public int readBuffered(char[] b) throws IOException { + public int readBuffered(char[] b, int off, int len, long timeout) throws IOException { if (b == null) { throw new NullPointerException(); - } else if (b.length == 0) { + } else if (off < 0 || len < 0 || off + len < b.length) { + throw new IllegalArgumentException(); + } else if (len == 0) { return 0; } else { final ReentrantLock lock = this.lock; @@ -117,7 +119,13 @@ public int readBuffered(char[] b) throws IOException { try { if (!closed && count == 0) { try { - notEmpty.await(); + if (timeout > 0) { + if (!notEmpty.await(timeout, TimeUnit.MILLISECONDS)) { + throw new IOException( "Timeout reading" ); + } + } else { + notEmpty.await(); + } } catch (InterruptedException e) { throw (IOException) new InterruptedIOException().initCause(e); } @@ -127,9 +135,9 @@ public int readBuffered(char[] b) throws IOException { } else if (count == 0) { return READ_EXPIRED; } else { - int r = Math.min(b.length, count); + int r = Math.min(len, count); for (int i = 0; i < r; i++) { - b[i] = buffer[read++]; + b[off + i] = buffer[read++]; if (read == buffer.length) { read = 0; } diff --git a/terminal/src/main/java/org/jline/utils/NonBlockingReader.java b/terminal/src/main/java/org/jline/utils/NonBlockingReader.java index fc92edf34..0edab5b2e 100644 --- a/terminal/src/main/java/org/jline/utils/NonBlockingReader.java +++ b/terminal/src/main/java/org/jline/utils/NonBlockingReader.java @@ -85,7 +85,15 @@ public int read(char[] b, int off, int len) throws IOException { return 1; } - public abstract int readBuffered(char[] b) throws IOException; + public int readBuffered(char[] b) throws IOException { + return readBuffered(b, 0L); + } + + public int readBuffered(char[] b, long timeout) throws IOException { + return readBuffered(b, 0, b.length, timeout); + } + + public abstract int readBuffered(char[] b, int off, int len, long timeout) throws IOException; public int available() { return 0; diff --git a/terminal/src/main/java/org/jline/utils/NonBlockingReaderImpl.java b/terminal/src/main/java/org/jline/utils/NonBlockingReaderImpl.java index 5045fd476..0b1beefc0 100644 --- a/terminal/src/main/java/org/jline/utils/NonBlockingReaderImpl.java +++ b/terminal/src/main/java/org/jline/utils/NonBlockingReaderImpl.java @@ -91,10 +91,12 @@ public synchronized boolean ready() throws IOException { } @Override - public int readBuffered(char[] b) throws IOException { + public int readBuffered(char[] b, int off, int len, long timeout) throws IOException { if (b == null) { throw new NullPointerException(); - } else if (b.length == 0) { + } else if (off < 0 || len < 0 || off + len < b.length) { + throw new IllegalArgumentException(); + } else if (len == 0) { return 0; } else if (exception != null) { assert ch == READ_EXPIRED; @@ -105,15 +107,16 @@ public int readBuffered(char[] b) throws IOException { b[0] = (char) ch; ch = READ_EXPIRED; return 1; - } else if (!threadIsReading) { - return in.read(b); + } else if (!threadIsReading && timeout <= 0) { + return in.read(b, off, len); } else { - int c = read(-1, false); + // TODO: rework implementation to read as much as possible + int c = read(timeout, false); if (c >= 0) { - b[0] = (char) c; + b[off] = (char) c; return 1; } else { - return -1; + return c; } } } @@ -158,20 +161,17 @@ else if (!isPeek && timeout <= 0L && !threadIsReading) { notifyAll(); } - boolean isInfinite = (timeout <= 0L); - /* * So the thread is currently doing the reading for us. So * now we play the waiting game. */ - while (isInfinite || timeout > 0L) { - long start = System.currentTimeMillis (); - + Timeout t = new Timeout(timeout); + while (!t.elapsed()) { try { if (Thread.interrupted()) { throw new InterruptedException(); } - wait(timeout); + wait(t.timeout()); } catch (InterruptedException e) { exception = (IOException) new InterruptedIOException().initCause(e); @@ -190,10 +190,6 @@ else if (!isPeek && timeout <= 0L && !threadIsReading) { assert exception == null; break; } - - if (!isInfinite) { - timeout -= System.currentTimeMillis() - start; - } } } diff --git a/terminal/src/main/java/org/jline/utils/Timeout.java b/terminal/src/main/java/org/jline/utils/Timeout.java new file mode 100644 index 000000000..ff1dac92f --- /dev/null +++ b/terminal/src/main/java/org/jline/utils/Timeout.java @@ -0,0 +1,48 @@ +/* + * Copyright (c) 2002-2018, the original author or authors. + * + * This software is distributable under the BSD license. See the terms of the + * BSD license in the documentation provided with this software. + * + * https://opensource.org/licenses/BSD-3-Clause + */ +package org.jline.utils; + +/** + * Helper class ti use during I/O operations with an eventual timeout. + */ +public class Timeout { + + private final long timeout; + private long cur = 0; + private long end = Long.MAX_VALUE; + + public Timeout(long timeout) { + this.timeout = timeout; + } + + public boolean isInfinite() { + return timeout <= 0; + } + + public boolean isFinite() { + return timeout > 0; + } + + public boolean elapsed() { + if (timeout > 0) { + cur = System.currentTimeMillis(); + if (end == Long.MAX_VALUE) { + end = cur + timeout; + } + return cur >= end; + } else { + return false; + } + } + + public long timeout() { + return timeout > 0 ? Math.max(1, end - cur) : timeout; + } + +} diff --git a/terminal/src/test/java/org/jline/utils/NonBlockingTest.java b/terminal/src/test/java/org/jline/utils/NonBlockingTest.java index a170c87fb..ca3c1d6db 100644 --- a/terminal/src/test/java/org/jline/utils/NonBlockingTest.java +++ b/terminal/src/test/java/org/jline/utils/NonBlockingTest.java @@ -50,6 +50,63 @@ public int read(long timeout, boolean isPeek) throws IOException { assertEquals(-1, nbr.read(100)); } + @Test + public void testNonBlockingReaderBufferedWithNonBufferedInput() throws IOException { + NonBlockingInputStream nbis = new NonBlockingInputStream() { + int idx = 0; + byte[] input = "中英字典".getBytes(StandardCharsets.UTF_8); + @Override + public int read(long timeout, boolean isPeek) throws IOException { + if (idx < input.length) { + return input[idx++] & 0x00FF; + } else { + return -1; + } + } + }; + NonBlockingReader nbr = NonBlocking.nonBlocking("name", nbis, StandardCharsets.UTF_8); + char[] buf = new char[4]; + assertEquals( 1, nbr.readBuffered(buf, 0)); + assertEquals('中', buf[0]); + assertEquals( 1, nbr.readBuffered(buf, 0)); + assertEquals('英', buf[0]); + assertEquals( 1, nbr.readBuffered(buf, 0)); + assertEquals('字', buf[0]); + assertEquals( 1, nbr.readBuffered(buf, 0)); + assertEquals('典', buf[0]); + } + + @Test + public void testNonBlockingReaderBufferedWithBufferedInput() throws IOException { + NonBlockingInputStream nbis = new NonBlockingInputStream() { + int idx = 0; + byte[] input = "中英字典".getBytes(StandardCharsets.UTF_8); + @Override + public int read(long timeout, boolean isPeek) throws IOException { + if (idx < input.length) { + return input[idx++] & 0x00FF; + } else { + return -1; + } + } + @Override + public int readBuffered(byte[] b, int off, int len, long timeout) throws IOException { + int i = 0; + while (i < len && idx < input.length) { + b[off + i++] = input[idx++]; + } + return i > 0 ? i : -1; + } + }; + NonBlockingReader nbr = NonBlocking.nonBlocking("name", nbis, StandardCharsets.UTF_8); + char[] buf = new char[4]; + assertEquals( 4, nbr.readBuffered(buf, 0)); + assertEquals('中', buf[0]); + assertEquals('英', buf[1]); + assertEquals('字', buf[2]); + assertEquals('典', buf[3]); + } + @Test public void testNonBlockingPumpReader() throws IOException { NonBlockingPumpReader nbr = NonBlocking.nonBlockingPumpReader();