Skip to content

Commit

Permalink
Implement netty client using the new micronaut-core RawHttpClient
Browse files Browse the repository at this point in the history
This PR replaces most of the netty-based HTTP client that previously used the low-level micronaut-http-client ConnectionManager API with an implementation based on the new micronaut-http-client-core RawHttpClient introduced in micronaut-core 4.7.0. This offers some advantages:

- Much simpler implementation. Once the legacy implementation is not needed anymore, all classes deprecated in this PR can be removed.
- Possibility to work with non-netty RawHttpClient implementations, once those exist. In particular, this will allow using the JDK http client instead.
- RawHttpClient runs normal micronaut-core ClientFilters, so we won't need the `OciNettyClientFilter` API anymore.

While in theory the new implementation should be a drop-in replacement, it is possible that RawHttpClient differs slightly in behavior. For that reason, I've kept the old implementation around. It can be enabled by the `oci.netty.legacy-netty-client` config property, or the `io.micronaut.oraclecloud.httpclient.netty.legacy-netty-client` system property.

This also means that MicronautHttpRequest and MicronautHttpResponse are actually mostly copied from NettyHttpRequest and NettyHttpResponse. Please consider that when reviewing, the actual changes are not that big.

This PR works and is ready for review, but still needs a release of micronaut-core 4.7.0 (and of micronaut-serialization due to the unrelated micronaut-projects/micronaut-serialization#943 ).
  • Loading branch information
yawkat committed Oct 9, 2024
1 parent ceb976f commit abd474e
Show file tree
Hide file tree
Showing 28 changed files with 858 additions and 309 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
* {@link io.netty.handler.codec.http.LastHttpContent} is received completes a {@link #future} with
* the accumulated data.
*/
@Deprecated
final class BufferFutureHandler extends DecidedBodyHandler {
final CompletableFuture<ByteBuf> future = new CompletableFuture<>();
private CompositeByteBuf buffer;
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
* consume the body. First any data buffered by {@link UndecidedBodyHandler} is processed, then a {@link HandlerImpl}
* is added to the pipeline to directly process the remaining chunks.
*/
@Deprecated
abstract class DecidedBodyHandler {
private boolean done = false;
private volatile ChannelHandlerContext context;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
/**
* Handler that discards incoming data.
*/
@Deprecated
final class DiscardingHandler extends DecidedBodyHandler {
static final DiscardingHandler INSTANCE = new DiscardingHandler();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
/**
* {@link java.util.Map} wrapper around netty {@link HttpHeaders}. Read-only.
*/
@Deprecated
final class HeaderMap extends AbstractMap<String, List<String>> {
private final HttpHeaders headers;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
* Handler that buffers some response bytes until a set limit. This way, when normal body reading fails, we can still
* read a potentially short error message from this handler.
*/
@Deprecated
final class LimitedBufferingBodyHandler extends ChannelInboundHandlerAdapter {
private final int maxBuffer;
private CompositeByteBuf buffer;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
/*
* Copyright 2017-2024 original authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.micronaut.oraclecloud.httpclient.netty;

import io.micronaut.core.io.buffer.ByteBuffer;
import io.micronaut.core.io.buffer.ReferenceCounted;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

import java.io.ByteArrayOutputStream;
import java.io.Closeable;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;

/**
* {@link Subscriber} implementation that buffers its input up to a certain number of bytes.
*
* @author Jonas Konrad
* @since 4.3
*/
final class LimitedBufferingSubscriber implements Subscriber<ByteBuffer<?>>, Closeable {
final CompletableFuture<byte[]> future = new CompletableFuture<>();

private final int maxBuffer;
private final ByteArrayOutputStream buffer = new ByteArrayOutputStream();

private boolean closed;
private Subscription subscription;

LimitedBufferingSubscriber(int maxBuffer) {
this.maxBuffer = maxBuffer;
}

@Override
public void onSubscribe(Subscription s) {
boolean closed;
synchronized (this) {
closed = this.closed;
if (!closed) {
this.subscription = s;
}
}
if (closed) {
s.cancel();
}
}

@Override
public void onNext(ByteBuffer<?> byteBuffer) {
try {
byteBuffer.toInputStream().transferTo(buffer);
} catch (IOException e) {
future.completeExceptionally(e);
}
if (byteBuffer instanceof ReferenceCounted rc) {
rc.release();
}
if (buffer.size() >= maxBuffer) {
future.completeExceptionally(new IOException("Request body was streamed and too large for opportunistic buffering"));
subscription.cancel();
} else {
subscription.request(1);
}
}

@Override
public void onError(Throwable t) {
future.completeExceptionally(t);
}

@Override
public void onComplete() {
future.complete(buffer.toByteArray());
}

@Override
public void close() {
synchronized (this) {
if (!closed) {
if (subscription != null) {
subscription.cancel();
subscription = null;
}
closed = true;
}
}
}
}
Loading

0 comments on commit abd474e

Please sign in to comment.