Skip to content

Commit

Permalink
Test connection alive when transformation errors
Browse files Browse the repository at this point in the history
  • Loading branch information
violetagg committed Apr 27, 2018
1 parent 74912c5 commit 23bd118
Showing 1 changed file with 65 additions and 0 deletions.
65 changes: 65 additions & 0 deletions src/test/java/reactor/ipc/netty/http/client/WebsocketTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,24 +16,30 @@

package reactor.ipc.netty.http.client;

import java.nio.charset.Charset;
import java.time.Duration;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;

import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketHandshakeException;
import org.assertj.core.api.Assertions;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxProcessor;
import reactor.core.publisher.Mono;
import reactor.core.publisher.ReplayProcessor;
import reactor.ipc.netty.NettyContext;
import reactor.ipc.netty.http.server.HttpServer;
import reactor.ipc.netty.http.websocket.WebsocketInbound;
import reactor.ipc.netty.http.websocket.WebsocketOutbound;
import reactor.ipc.netty.resources.PoolResources;
import reactor.test.StepVerifier;

Expand Down Expand Up @@ -481,4 +487,63 @@ public void testCloseWebSocketFrameSentByClient() {
.expectComplete()
.verify(Duration.ofSeconds(30));
}

@Test
public void testConnectionAliveWhenTransformationErrors_1() {
doTestConnectionAliveWhenTransformationErrors((in, out) ->
out.options(sendOptions -> sendOptions.flushOnEach())
.sendObject(in.aggregateFrames()
.receiveFrames()
.map(WebSocketFrame::content)
//.share()
.publish()
.autoConnect()
.map(byteBuf -> byteBuf.toString(Charset.defaultCharset()))
.map(Integer::parseInt)
.map(i -> new TextWebSocketFrame(i + ""))
.retry()),
Flux.just("1", "2"), 2);
}

@Test
public void testConnectionAliveWhenTransformationErrors_2() {
doTestConnectionAliveWhenTransformationErrors((in, out) ->
out.options(sendOptions -> sendOptions.flushOnEach())
.sendObject(in.aggregateFrames()
.receiveFrames()
.map(WebSocketFrame::content)
.concatMap(content ->
Mono.just(content)
.map(byteBuf -> byteBuf.toString(Charset.defaultCharset()))
.map(Integer::parseInt)
.map(i -> new TextWebSocketFrame(i + ""))
.onErrorResume(t -> Mono.just(new TextWebSocketFrame("error"))))),
Flux.just("1", "error", "2"), 3);
}

private void doTestConnectionAliveWhenTransformationErrors(BiFunction<? super WebsocketInbound, ? super WebsocketOutbound, ? extends Publisher<Void>> handler,
Flux<String> expectation, int count) {
httpServer =
HttpServer.create(0)
.newHandler((req, res) -> res.sendWebsocket(handler))
.block(Duration.ofSeconds(30));

ReplayProcessor<String> output = ReplayProcessor.create();
HttpClient.create(httpServer.address().getPort())
.ws("/")
.flatMap(res ->
res.receiveWebsocket((in, out) -> out.sendString(Flux.just("1", "text", "2"))
.then(in.aggregateFrames()
.receiveFrames()
.map(WebSocketFrame::content)
.map(byteBuf -> byteBuf.toString(Charset.defaultCharset()))
.take(count)
.subscribeWith(output)
.then())))
.block(Duration.ofSeconds(30));

Assertions.assertThat(output.collectList().block(Duration.ofSeconds(30)))
.isEqualTo(expectation.collectList().block(Duration.ofSeconds(30)));

}
}

0 comments on commit 23bd118

Please sign in to comment.