Skip to content

Commit

Permalink
Merge pull request #56 from jentfoo/ClientReaderFix
Browse files Browse the repository at this point in the history
4.3 Version with reader invocation improvement
  • Loading branch information
jentfoo authored Jan 31, 2018
2 parents a58eb7b + 5212cd1 commit 4ba04ac
Show file tree
Hide file tree
Showing 9 changed files with 41 additions and 35 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ Include the litesockets library into your project from maven central:
<dependency>
<groupId>org.threadly</groupId>
<artifactId>litesockets</artifactId>
<version>4.2</version>
<version>4.3</version>
</dependency>
```

Expand Down
4 changes: 2 additions & 2 deletions gradle.properties
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
group = org.threadly
version = 4.2
threadlyVersion = 5.10
version = 4.3
threadlyVersion = 5.12
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>org.threadly</groupId>
<artifactId>litesockets</artifactId>
<version>4.1</version>
<version>4.3</version>
<packaging>jar</packaging>

<name>LiteSockets</name>
Expand Down
24 changes: 16 additions & 8 deletions src/main/java/org/threadly/litesockets/Client.java
Original file line number Diff line number Diff line change
Expand Up @@ -267,22 +267,30 @@ protected ByteBuffer provideReadByteBuffer() {

}

protected void callClosers(Throwable error) {
this.getClientsThreadExecutor().execute(()->{
protected void callClosers(boolean invokedOnClientThread, Throwable error) {
runListener(()->{
while(!closerListener.isEmpty()) {
if (error == null) {
closerListener.poll().onClose(this);
} else {
closerListener.poll().onCloseWithError(this, error);
}
}
});
}, invokedOnClientThread);
}

protected void callReader() {
protected void callReader(boolean invokedOnClientThread) {
Runnable readerCaller = this.readerCaller;
if (readerCaller != null) {
getClientsThreadExecutor().execute(readerCaller);
runListener(readerCaller, invokedOnClientThread);
}
}

private void runListener(Runnable listener, boolean invokedOnClientThread) {
if (invokedOnClientThread) {
ExceptionUtils.runRunnable(listener); // make sure errors from listener don't escape
} else {
getClientsThreadExecutor().execute(listener);
}
}

Expand All @@ -304,7 +312,7 @@ protected void addReadBuffer(final ByteBuffer bb) {
readBuffers.add(bb);
end = readBuffers.remaining();
if(end > 0 && start == 0){
callReader();
callReader(true); // we assume all buffers are added from the clients thread
}
}

Expand Down Expand Up @@ -359,7 +367,7 @@ public void addCloseListener(final ClientCloseListener closer) {
} else {
closerListener.add(closer);
if(closed.get() && !closerListener.isEmpty()) {
this.callClosers(null);
this.callClosers(false, null);
}
}
}
Expand All @@ -379,7 +387,7 @@ public void setReader(final Reader reader) {
synchronized(readerLock) {
readerCaller = () -> reader.onRead(this);
if (this.getReadBufferSize() > 0) {
callReader();
callReader(false); // we can't assume this is the reader thread
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,16 +104,14 @@ protected void shutdownService() {
if(commonSelector != null && commonSelector.isOpen()) {
closeSelector(schedulerPool, commonSelector);
}
while(localNoThreadScheduler.hasTaskReadyToRun()) {
try {
localNoThreadScheduler.tick(null);
} catch(Exception e) {
ExceptionUtils.handleException(e);
}
}
executeSchedulerTasks();
clients.clear();
servers.clear();
}

private void executeSchedulerTasks() {
localNoThreadScheduler.tick(ExceptionUtils::handleException);
}

private void doClientOperations(final Client client) {
SelectionKey sk = client.getChannel().keyFor(commonSelector);
Expand Down Expand Up @@ -181,7 +179,7 @@ public void select(final int delay) {
while(isRunning() && !wakeUp && (runOnce || Clock.accurateForwardProgressingMillis() - startTime <= delay)) {
try {
commonSelector.selectNow(); //We have to do this before we tick for windows
localNoThreadScheduler.tick(null);
executeSchedulerTasks();
commonSelector.selectedKeys().clear();
commonSelector.select(Math.min(delay, SELECT_TIME_MS));
if(isRunning()) {
Expand Down Expand Up @@ -235,7 +233,7 @@ public void select(final int delay) {
//Also for windows bug, canceled keys are not removed till we select again.
//So we just have to at the end of the loop.
commonSelector.selectNow();
localNoThreadScheduler.tick(null);
executeSchedulerTasks();
}
} catch (IOException e) {
//There is really nothing to do here but try again, usually this is because of shutdown.
Expand Down
12 changes: 6 additions & 6 deletions src/main/java/org/threadly/litesockets/TCPClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -148,9 +148,8 @@ protected SocketChannel getChannel() {
public void close(Throwable error) {
if(setClose()) {
se.setClientOperations(this);
this.getClientsThreadExecutor().execute(new Runnable() {
@Override
public void run() {
this.getClientsThreadExecutor().execute(() -> {
try {
synchronized(writerLock) {
if(writeFutures.size() > 0) {
final ClosedChannelException cce = new ClosedChannelException();
Expand All @@ -165,9 +164,10 @@ public void run() {
if(sslProcessor != null) {
sslProcessor.failHandshake(error);
}
}});

this.callClosers(error);
} finally {
callClosers(true, error);
}
});
}
}

Expand Down
4 changes: 2 additions & 2 deletions src/main/java/org/threadly/litesockets/UDPClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ protected void addReadBuffer(final ByteBuffer bb) {
synchronized(readerLock) {
readBuffers.add(bb);
}
callReader();
callReader(true); // buffers should be added from client thread
}

@Override
Expand Down Expand Up @@ -105,7 +105,7 @@ public boolean isClosed() {
@Override
public void close(Throwable error) {
if(this.setClose()) {
callClosers(error);
callClosers(false, error);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,6 @@ public ReuseableMergedByteBuffers duplicateAndClean() {
return mbb;
}



@Override
protected void addToFront(ByteBuffer bb) {
this.availableBuffers.addFirst(bb.duplicate());
Expand Down
10 changes: 6 additions & 4 deletions src/main/java/org/threadly/litesockets/utils/IOUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -169,8 +169,9 @@ public int read(byte[] ba, int offset, int len) throws IOException {
}
synchronized(currentBB) {
while(true) {
if(c.getReadBufferSize() > 0) {
currentBB.add(c.getRead());
ReuseableMergedByteBuffers mbb = c.getRead();
if(mbb.hasRemaining()) {
currentBB.add(mbb);
}
if(currentBB.remaining() >= len) {
currentBB.get(ba, offset, len);
Expand Down Expand Up @@ -201,8 +202,9 @@ public int read() throws IOException {
if(currentBB.remaining() > 0) {
return currentBB.get() & MergedByteBuffers.UNSIGNED_BYTE_MASK;
} else {
if(c.getReadBufferSize() > 0) {
currentBB.add(c.getRead());
ReuseableMergedByteBuffers mbb = c.getRead();
if(mbb.hasRemaining()) {
currentBB.add(mbb);
} else if(isClosed) {
return -1;
} else {
Expand Down

0 comments on commit 4ba04ac

Please sign in to comment.