Skip to content

Commit

Permalink
Rename WS implementations
Browse files Browse the repository at this point in the history
  • Loading branch information
Stephane Maldini committed Nov 6, 2017
1 parent 2593e1d commit ee9f2d9
Show file tree
Hide file tree
Showing 5 changed files with 18 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ public boolean isKeepAlive() {
@Override
public boolean isWebsocket() {
return get(channel()).getClass()
.equals(HttpClientWSOperations.class);
.equals(WebsocketClientOperations.class);
}

@Override
Expand Down Expand Up @@ -399,8 +399,8 @@ public ByteBufAllocator alloc() {
@Override
public String selectedSubprotocol() {
if (isWebsocket()) {
HttpClientWSOperations ops =
(HttpClientWSOperations) get(channel());
WebsocketClientOperations ops =
(WebsocketClientOperations) get(channel());

assert ops != null;
return ops.selectedSubprotocol();
Expand Down Expand Up @@ -671,7 +671,8 @@ final Mono<Void> withWebsocketSupport(URI url,
if (markSentHeaders()) {
addHandlerFirst(NettyPipeline.HttpAggregator, new HttpObjectAggregator(8192));

HttpClientWSOperations ops = new HttpClientWSOperations(url, protocols, this);
WebsocketClientOperations
ops = new WebsocketClientOperations(url, protocols, this);

if (replace(ops)) {
Mono<Void> handshake = FutureMono.from(ops.handshakerResult)
Expand All @@ -685,8 +686,8 @@ final Mono<Void> withWebsocketSupport(URI url,
}
}
else if (isWebsocket()) {
HttpClientWSOperations ops =
(HttpClientWSOperations) get(channel());
WebsocketClientOperations ops =
(WebsocketClientOperations) get(channel());
if(ops != null) {
Mono<Void> handshake = FutureMono.from(ops.handshakerResult);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,15 +42,15 @@
* @author Stephane Maldini
* @author Simon Baslé
*/
final class HttpClientWSOperations extends HttpClientOperations
final class WebsocketClientOperations extends HttpClientOperations
implements WebsocketInbound, WebsocketOutbound, BiConsumer<Void, Throwable> {

final WebSocketClientHandshaker handshaker;
final ChannelPromise handshakerResult;

volatile int closeSent;

HttpClientWSOperations(URI currentURI,
WebsocketClientOperations(URI currentURI,
String protocols,
HttpClientOperations replaced) {
super(replaced.channel(), replaced);
Expand Down Expand Up @@ -228,7 +228,7 @@ public void accept(Void aVoid, Throwable throwable) {
}
}

static final AtomicIntegerFieldUpdater<HttpClientWSOperations> CLOSE_SENT =
AtomicIntegerFieldUpdater.newUpdater(HttpClientWSOperations.class,
static final AtomicIntegerFieldUpdater<WebsocketClientOperations> CLOSE_SENT =
AtomicIntegerFieldUpdater.newUpdater(WebsocketClientOperations.class,
"closeSent");
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@
import io.netty.handler.codec.http.LastHttpContent;
import io.netty.handler.codec.http.cookie.Cookie;
import io.netty.handler.codec.http.cookie.ServerCookieEncoder;
import io.netty.handler.codec.http.websocketx.WebSocketHandshakeException;
import io.netty.util.AsciiString;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
Expand Down Expand Up @@ -463,7 +462,8 @@ final Mono<Void> withWebsocketSupport(String url,
BiFunction<? super WebsocketInbound, ? super WebsocketOutbound, ? extends Publisher<Void>> websocketHandler) {
Objects.requireNonNull(websocketHandler, "websocketHandler");
if (markSentHeaders()) {
HttpServerWSOperations ops = new HttpServerWSOperations(url, protocols, this);
WebsocketServerOperations
ops = new WebsocketServerOperations(url, protocols, this);

if (replace(ops)) {
return FutureMono.from(ops.handshakerResult)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import io.netty.handler.codec.http.websocketx.PongWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketServerHandshaker;
import io.netty.handler.codec.http.websocketx.WebSocketServerHandshakerFactory;
import io.netty.util.ReferenceCountUtil;
import reactor.ipc.netty.NettyPipeline;
import reactor.ipc.netty.http.HttpOperations;
import reactor.ipc.netty.http.websocket.WebsocketInbound;
Expand All @@ -46,15 +45,15 @@
* @author Stephane Maldini
* @author Simon Baslé
*/
final class HttpServerWSOperations extends HttpServerOperations
final class WebsocketServerOperations extends HttpServerOperations
implements WebsocketInbound, WebsocketOutbound, BiConsumer<Void, Throwable> {

final WebSocketServerHandshaker handshaker;
final ChannelPromise handshakerResult;

volatile int closeSent;

HttpServerWSOperations(String wsUrl,
WebsocketServerOperations(String wsUrl,
@Nullable String protocols,
HttpServerOperations replaced) {
super(replaced.channel(), replaced);
Expand Down Expand Up @@ -155,7 +154,7 @@ public String selectedSubprotocol() {
return handshaker.selectedSubprotocol();
}

static final AtomicIntegerFieldUpdater<HttpServerWSOperations> CLOSE_SENT =
AtomicIntegerFieldUpdater.newUpdater(HttpServerWSOperations.class,
static final AtomicIntegerFieldUpdater<WebsocketServerOperations> CLOSE_SENT =
AtomicIntegerFieldUpdater.newUpdater(WebsocketServerOperations.class,
"closeSent");
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
/**
* @author Violeta Georgieva
*/
public class HttpClientWSOperationsTest {
public class WebsocketClientOperationsTest {

@Test
public void requestError() {
Expand Down

0 comments on commit ee9f2d9

Please sign in to comment.