Skip to content

Commit

Permalink
4.2 release to upgrade threadly to 5.10
Browse files Browse the repository at this point in the history
This upgrades to threadly 5.10 as well as includes the minor changes needed to support the new `ListenableFuture` execution optimizations.
  • Loading branch information
jentfoo committed Jan 17, 2018
1 parent 2f1ffbe commit a58eb7b
Show file tree
Hide file tree
Showing 6 changed files with 41 additions and 8 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ Include the litesockets library into your project from maven central:
<dependency>
<groupId>org.threadly</groupId>
<artifactId>litesockets</artifactId>
<version>4.1</version>
<version>4.2</version>
</dependency>
```

Expand Down
4 changes: 2 additions & 2 deletions gradle.properties
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
group = org.threadly
version = 4.1
threadlyVersion = 5.3
version = 4.2
threadlyVersion = 5.10
10 changes: 8 additions & 2 deletions src/main/java/org/threadly/litesockets/Client.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

import org.threadly.concurrent.SubmitterExecutor;
import org.threadly.concurrent.future.ListenableFuture;
import org.threadly.concurrent.future.SettableListenableFuture;
import org.threadly.litesockets.buffers.MergedByteBuffers;
import org.threadly.litesockets.buffers.ReuseableMergedByteBuffers;
import org.threadly.litesockets.utils.IOUtils;
Expand Down Expand Up @@ -59,6 +60,10 @@ protected Client(final SocketExecuterCommonBase se, final SubmitterExecutor clie
this.se = se;
this.clientExecutor = clientExecutor;
}

protected <T> SettableListenableFuture<T> makeClientSettableListenableFuture() {
return new ClientSettableListenableFuture<>(clientExecutor);
}

/**
* <p>Used by SocketExecuter to set if there was a success or error when connecting, completing the
Expand Down Expand Up @@ -144,8 +149,9 @@ protected Client(final SocketExecuterCommonBase se, final SubmitterExecutor clie
*
* <p>If there is an error connecting {@link #close()} will also be called on the client.</p>
*
* @return A {@link ListenableFuture} that will complete when the socket is connected, or fail if we cant connect.
* @return A {@link ListenableFuture} that will complete when the socket is connected, or fail if we can't connect.
*/
// TODO - change return type to `ListenableFuture<?>` for ls 5.0
public abstract ListenableFuture<Boolean> connect();

/**
Expand Down Expand Up @@ -653,7 +659,7 @@ public interface ClientOptions {
*/
public int getUdpFrameSize();
}

protected class BaseClientOptions implements ClientOptions {
@Override
public boolean setNativeBuffers(boolean enabled) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package org.threadly.litesockets;

import java.util.concurrent.Executor;

import org.threadly.concurrent.future.SettableListenableFuture;

/**
* Implementation of {@link SettableListenableFuture} to access {@code protected} constructor where
* we can specify the executor the listener will complete on. This is very useful / important
* for litesockets so that optimizations can occur when listeners complete on the client's executor.
*
* @param <T> The type of result returned by this future
*/
public class ClientSettableListenableFuture<T> extends SettableListenableFuture<T> {
// class and constructor needs to be public for `SSLProcessor`
public ClientSettableListenableFuture(Client client) {
this(client.getClientsThreadExecutor());
}

protected ClientSettableListenableFuture(Executor clientExecutor) {
super(false, clientExecutor);
}
}
6 changes: 4 additions & 2 deletions src/main/java/org/threadly/litesockets/TCPClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public class TCPClient extends Client {
private final Deque<Pair<Long, SettableListenableFuture<Long>>> writeFutures = new ArrayDeque<>(8);
private final TCPSocketOptions tso = new TCPSocketOptions();
protected final AtomicBoolean startedConnection = new AtomicBoolean(false);
protected final SettableListenableFuture<Boolean> connectionFuture = new SettableListenableFuture<>(false);
protected final SettableListenableFuture<Boolean> connectionFuture;
protected final SocketChannel channel;
protected final InetSocketAddress remoteAddress;

Expand All @@ -63,6 +63,7 @@ public class TCPClient extends Client {
*/
protected TCPClient(final SocketExecuterCommonBase sei, final String host, final int port) throws IOException {
super(sei);
connectionFuture = makeClientSettableListenableFuture();
remoteAddress = new InetSocketAddress(host, port);
channel = SocketChannel.open();
channel.configureBlocking(false);
Expand All @@ -81,6 +82,7 @@ protected TCPClient(final SocketExecuterCommonBase sei, final SocketChannel chan
if(! channel.isOpen()) {
throw new ClosedChannelException();
}
connectionFuture = makeClientSettableListenableFuture();
connectionFuture.setResult(true);
if(channel.isBlocking()) {
channel.configureBlocking(false);
Expand Down Expand Up @@ -227,7 +229,7 @@ public ListenableFuture<?> write(final MergedByteBuffers mbb) {
return FutureUtils.immediateFailureFuture(new IOException("Connection is Closed"));
}
synchronized(writerLock) {
final SettableListenableFuture<Long> slf = new SettableListenableFuture<>(false);
final SettableListenableFuture<Long> slf = makeClientSettableListenableFuture();
lastWriteFuture = slf;
final boolean needNotify = !canWrite();
if(sslProcessor != null && sslProcessor.handShakeStarted()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.threadly.concurrent.future.ListenableFuture;
import org.threadly.concurrent.future.SettableListenableFuture;
import org.threadly.litesockets.Client;
import org.threadly.litesockets.ClientSettableListenableFuture;
import org.threadly.litesockets.buffers.MergedByteBuffers;
import org.threadly.litesockets.buffers.ReuseableMergedByteBuffers;
import org.threadly.litesockets.buffers.SimpleMergedByteBuffers;
Expand Down Expand Up @@ -45,7 +46,7 @@ public class SSLProcessor {

private final AtomicBoolean finishedHandshake = new AtomicBoolean(false);
private final AtomicBoolean startedHandshake = new AtomicBoolean(false);
private final SettableListenableFuture<SSLSession> handshakeFuture = new SettableListenableFuture<SSLSession>(false);
private final SettableListenableFuture<SSLSession> handshakeFuture;
private final MergedByteBuffers encryptedReadBuffers = new ReuseableMergedByteBuffers(false);
private final MergedByteBuffers tempBuffers = new ReuseableMergedByteBuffers(false);
private final SSLEngine ssle;
Expand All @@ -54,6 +55,7 @@ public class SSLProcessor {
private ByteBuffer decryptedReadBuffer;

public SSLProcessor(final Client client, final SSLEngine ssle) {
this.handshakeFuture = new ClientSettableListenableFuture<>(client);
this.client = client;
this.ssle = ssle;
}
Expand Down

0 comments on commit a58eb7b

Please sign in to comment.