Skip to content

Commit

Permalink
fix #61 Offer a simple blocking alternative API for client/server
Browse files Browse the repository at this point in the history
  • Loading branch information
simonbasle committed Jun 28, 2017
1 parent 7b815d8 commit 3521c88
Show file tree
Hide file tree
Showing 4 changed files with 162 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@ public ChannelOperations<INBOUND, OUTBOUND> context(Consumer<NettyContext> conte
@Override
public void dispose() {
inbound.cancel();
//TODO shouldn't super.dispose be called there / channel closed?
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ final public void channelRead(ChannelHandlerContext ctx, Object msg)
try {
ChannelOperations<?, ?> ops = ChannelOperations.get(ctx.channel());
if (ops != null) {
ChannelOperations.get(ctx.channel()).onInboundNext(ctx, msg);
ops.onInboundNext(ctx, msg);
}
else {
if (log.isDebugEnabled()) {
Expand Down
32 changes: 32 additions & 0 deletions src/main/java/reactor/ipc/netty/tcp/NettyContextFacade.java
Original file line number Diff line number Diff line change
@@ -1,13 +1,18 @@
package reactor.ipc.netty.tcp;

import java.net.InetSocketAddress;
import java.time.Duration;
import java.util.concurrent.TimeoutException;

import reactor.core.publisher.Mono;
import reactor.ipc.netty.NettyContext;
import reactor.util.Logger;
import reactor.util.Loggers;

/**
* Wrap a {@link NettyContext} obtained from a {@link Mono} and offer methods to manage
* its lifecycle in a blocking fashion.
*
* @author Simon Baslé
*/
public class NettyContextFacade {
Expand All @@ -17,14 +22,37 @@ public class NettyContextFacade {
private final NettyContext context;
private final String description;

private Duration lifecycleTimeout;

public NettyContextFacade(Mono<? extends NettyContext> contextAsync,
String description) {
this(contextAsync, description, Duration.ofSeconds(3));
}

public NettyContextFacade(Mono<? extends NettyContext> contextAsync,
String description, Duration lifecycleTimeout) {
this.description = description;
this.lifecycleTimeout = lifecycleTimeout;
this.context = contextAsync
.timeout(lifecycleTimeout, Mono.error(new TimeoutException(description + " couldn't be started within " + lifecycleTimeout.toMillis() + "ms")))
.doOnNext(ctx -> LOG.info("Started {} on {}", description, ctx.address()))
.block();
}

/**
* Change the lifecycle timeout applied to the {@link #stop()} operation (as this can
* only be called AFTER the {@link NettyContext} has been "started").
*
* @param timeout the new timeout to apply on stop.
*/
public void setLifecycleTimeout(Duration timeout) {
this.lifecycleTimeout = timeout;
}

/**
* Get the {@link NettyContext} wrapped by this facade.
* @return the original NettyContext.
*/
public NettyContext getContext() {
return context;
}
Expand All @@ -50,9 +78,13 @@ public String getHost() {
return context.address().getHostString();
}

/**
* Stop the {@link NettyContext} and wait for its termination, up to the {@link #setLifecycleTimeout(Duration) lifecycle timeout}.
*/
public void stop() {
context.dispose();
context.onClose()
.timeout(lifecycleTimeout, Mono.error(new TimeoutException(description + " couldn't be stopped within " + lifecycleTimeout.toMillis() + "ms")))
.doOnSuccess(aVoid -> LOG.info("Stopped {} on {}", description, context.address()))
.block();
}
Expand Down
153 changes: 128 additions & 25 deletions src/test/java/reactor/ipc/netty/tcp/NettyContextFacadeTest.java
Original file line number Diff line number Diff line change
@@ -1,57 +1,160 @@
package reactor.ipc.netty.tcp;

import java.net.InetSocketAddress;
import java.time.Duration;
import java.util.List;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;

import io.netty.channel.Channel;
import io.netty.channel.embedded.EmbeddedChannel;
import org.junit.Test;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.ipc.netty.NettyContext;
import reactor.ipc.netty.NettyPipeline;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;

/**
* @author Simon Baslé
*/
public class NettyContextFacadeTest {

static final NettyContext NEVER_STOP_CONTEXT = new NettyContext() {
@Override
public Channel channel() {
return new EmbeddedChannel();
}

@Override
public InetSocketAddress address() {
return InetSocketAddress.createUnresolved("localhost", 4321);
}

@Override
public Mono<Void> onClose() {
return Mono.never();
}
};

@Test
public void simpleServerFromAsyncServer() throws InterruptedException {
NettyContextFacade simpleServer =
TcpServer.create()
.startSimple((in, out) ->
out.options(NettyPipeline.SendOptions::flushOnEach)
.sendString(
in.receive()
.asString()
.log("SERVER")
.map(s -> "ECHO: " + s)
)
.startSimple((in, out) -> out
.options(NettyPipeline.SendOptions::flushOnEach)
.sendString(
in.receive()
.asString()
.takeUntil(s -> s.endsWith("CONTROL"))
.map(s -> "ECHO: " + s.replaceAll("CONTROL", ""))
.concatWith(Mono.just("DONE"))
)
.neverComplete()
);

System.out.println(simpleServer.getHost());
System.out.println(simpleServer.getPort());

AtomicReference<List<String>> data = new AtomicReference<>();
NettyContext clientContext =
AtomicReference<List<String>> data1 = new AtomicReference<>();
AtomicReference<List<String>> data2 = new AtomicReference<>();

NettyContextFacade simpleClient1 =
TcpClient.create(simpleServer.getPort())
.startSimple((in, out) -> {
return out.options(NettyPipeline.SendOptions::flushOnEach)
.sendString(Flux.just("Hello", "World", "CONTROL"))
.then(in.receive()
.asString()
.takeUntil(s -> s.endsWith("DONE"))
.map(s -> s.replaceAll("DONE", ""))
.filter(s -> !s.isEmpty())
.collectList()
.doOnNext(data1::set)
.doOnNext(System.err::println)
.then());
});

NettyContextFacade simpleClient2 =
TcpClient.create(simpleServer.getPort())
.newHandler((in, out) -> {
in.receive()
.asString()
.log("CLIENT RECEIVED")
.collectList()
.subscribe(data::set);
.startSimple((in, out) -> {
return out.options(NettyPipeline.SendOptions::flushOnEach)
.sendString(Flux.just("How", "Are", "You?", "CONTROL"))
.then(in.receive()
.asString()
.takeUntil(s -> s.endsWith("DONE"))
.map(s -> s.replaceAll("DONE", ""))
.filter(s -> !s.isEmpty())
.collectList()
.doOnNext(data2::set)
.doOnNext(System.err::println)
.then());
});

return out.sendString(Flux.just("Hello", "World")).neverComplete();
})
.block();
Thread.sleep(1000);
System.err.println("STOPPING 1");
simpleClient1.stop();

System.err.println("STOPPING 2");
simpleClient2.stop();

System.err.println("STOPPING SERVER");
simpleServer.stop();

clientContext.dispose();
clientContext.onClose().block();
assertThat(data1.get())
.allSatisfy(s -> assertThat(s).startsWith("ECHO: "));
assertThat(data2.get())
.allSatisfy(s -> assertThat(s).startsWith("ECHO: "));

assertThat(data1.get()
.toString()
.replaceAll("ECHO: ", "")
.replaceAll(", ", ""))
.isEqualTo("[HelloWorld]");
assertThat(data2.get()
.toString()
.replaceAll("ECHO: ", "")
.replaceAll(", ", ""))
.isEqualTo("[HowAreYou?]");
}

@Test
public void testTimeoutOnStart() {
assertThatExceptionOfType(RuntimeException.class)
.isThrownBy(() -> new NettyContextFacade(Mono.never(), "TEST NEVER START", Duration.ofMillis(100)))
.withCauseExactlyInstanceOf(TimeoutException.class)
.withMessage("java.util.concurrent.TimeoutException: TEST NEVER START couldn't be started within 100ms");
}

@Test
public void testTimeoutOnStop() {
final NettyContextFacade neverStop =
new NettyContextFacade(Mono.just(NEVER_STOP_CONTEXT), "TEST NEVER STOP", Duration.ofMillis(100));

assertThat(data.get()).containsExactly("ECHO: Hello", "ECHO: World");
assertThatExceptionOfType(RuntimeException.class)
.isThrownBy(neverStop::stop)
.withCauseExactlyInstanceOf(TimeoutException.class)
.withMessage("java.util.concurrent.TimeoutException: TEST NEVER STOP couldn't be stopped within 100ms");
}

@Test
public void testTimeoutOnStopChangedTimeout() {
final NettyContextFacade neverStop =
new NettyContextFacade(Mono.just(NEVER_STOP_CONTEXT), "TEST NEVER STOP", Duration.ofMillis(500));

neverStop.setLifecycleTimeout(Duration.ofMillis(100));

assertThatExceptionOfType(RuntimeException.class)
.isThrownBy(neverStop::stop)
.withCauseExactlyInstanceOf(TimeoutException.class)
.withMessage("java.util.concurrent.TimeoutException: TEST NEVER STOP couldn't be stopped within 100ms");
}

@Test
public void getContextAddressAndHost() {
NettyContextFacade facade = new NettyContextFacade(Mono.just(NEVER_STOP_CONTEXT), "foo");

assertThat(facade.getContext()).isSameAs(NEVER_STOP_CONTEXT);
assertThat(facade.getPort()).isEqualTo(NEVER_STOP_CONTEXT.address().getPort());
assertThat(facade.getHost()).isEqualTo(NEVER_STOP_CONTEXT.address().getHostString());
}
}

0 comments on commit 3521c88

Please sign in to comment.