Skip to content

Commit

Permalink
fix(resteasy,rest): ensure request body is consumed to avoid hanging
Browse files Browse the repository at this point in the history
(cherry picked from commit a6f9ff8)
  • Loading branch information
michalvavrik authored and gsmet committed Jan 7, 2025
1 parent 7e559c0 commit 327c66a
Show file tree
Hide file tree
Showing 6 changed files with 240 additions and 8 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
package io.quarkus.resteasy.multipart;

import java.util.function.Supplier;

import jakarta.annotation.Priority;
import jakarta.ws.rs.Consumes;
import jakarta.ws.rs.FormParam;
import jakarta.ws.rs.POST;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Priorities;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.container.ContainerRequestContext;
import jakarta.ws.rs.container.ContainerRequestFilter;
import jakarta.ws.rs.core.MediaType;
import jakarta.ws.rs.ext.Provider;

import org.jboss.resteasy.annotations.providers.multipart.MultipartForm;
import org.jboss.shrinkwrap.api.ShrinkWrap;
import org.jboss.shrinkwrap.api.asset.StringAsset;
import org.jboss.shrinkwrap.api.spec.JavaArchive;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

import io.quarkus.test.QuarkusUnitTest;
import io.restassured.RestAssured;

public class LargeMultipartPayloadTest {

@RegisterExtension
static QuarkusUnitTest test = new QuarkusUnitTest()
.setArchiveProducer(new Supplier<>() {
@Override
public JavaArchive get() {
return ShrinkWrap.create(JavaArchive.class)
.addAsResource(new StringAsset("""
quarkus.http.limits.max-body-size=30M
"""),
"application.properties");
}
});

@Test
public void testConnectionClosedOnException() {
RestAssured
.given()
.multiPart("content", twentyMegaBytes())
.post("/test/multipart")
.then()
.statusCode(500);
}

private static String twentyMegaBytes() {
return new String(new byte[20_000_000]);
}

@Path("/test")
public static class Resource {
@POST
@Path("/multipart")
@Produces(MediaType.TEXT_PLAIN)
@Consumes(MediaType.MULTIPART_FORM_DATA)
public String postForm(@MultipartForm final FormBody ignored) {
return "ignored";
}
}

public static class FormBody {

@FormParam("content")
public String content;

}

@Priority(Priorities.USER)
@Provider
public static class Filter implements ContainerRequestFilter {

@Override
public void filter(ContainerRequestContext containerRequestContext) {
throw new RuntimeException("Expected exception");
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
package io.quarkus.resteasy.reactive.server.test.multipart;

import static io.restassured.RestAssured.given;

import java.util.List;
import java.util.Map;
import java.util.function.Supplier;

import jakarta.ws.rs.Consumes;
import jakarta.ws.rs.POST;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.core.MediaType;
import jakarta.ws.rs.core.Response;

import org.hamcrest.Matchers;
import org.jboss.jandex.ClassInfo;
import org.jboss.jandex.MethodInfo;
import org.jboss.resteasy.reactive.RestForm;
import org.jboss.resteasy.reactive.common.model.ResourceClass;
import org.jboss.resteasy.reactive.multipart.FileUpload;
import org.jboss.resteasy.reactive.server.ServerExceptionMapper;
import org.jboss.resteasy.reactive.server.core.ResteasyReactiveRequestContext;
import org.jboss.resteasy.reactive.server.model.HandlerChainCustomizer;
import org.jboss.resteasy.reactive.server.model.ServerResourceMethod;
import org.jboss.resteasy.reactive.server.processor.scanning.MethodScanner;
import org.jboss.resteasy.reactive.server.spi.ServerRestHandler;
import org.jboss.shrinkwrap.api.ShrinkWrap;
import org.jboss.shrinkwrap.api.asset.StringAsset;
import org.jboss.shrinkwrap.api.spec.JavaArchive;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

import io.quarkus.resteasy.reactive.server.spi.MethodScannerBuildItem;
import io.quarkus.test.QuarkusUnitTest;
import io.smallrye.mutiny.Uni;

public class LargeMultipartPayloadTest {

@RegisterExtension
static QuarkusUnitTest test = new QuarkusUnitTest()
.setArchiveProducer(new Supplier<>() {
@Override
public JavaArchive get() {
return ShrinkWrap.create(JavaArchive.class)
.addAsResource(new StringAsset("""
quarkus.http.limits.max-body-size=30M
"""),
"application.properties");
}
}).addBuildChainCustomizer(buildChainBuilder -> buildChainBuilder.addBuildStep(context -> context.produce(
new MethodScannerBuildItem(new MethodScanner() {
@Override
public List<HandlerChainCustomizer> scan(MethodInfo method, ClassInfo actualEndpointClass,
Map<String, Object> methodContext) {
return List.of(new AlwaysFailHandler());
}
}))).produces(MethodScannerBuildItem.class).build());

@Test
public void testConnectionClosedOnException() {
given()
.multiPart("file", twentyMegaBytes())
.post("/test")
.then()
.statusCode(200)
.body(Matchers.is("Expected failure!"));
}

private static String twentyMegaBytes() {
return new String(new byte[20_000_000]);
}

@Path("/test")
public static class Resource {

@POST
@Consumes(MediaType.MULTIPART_FORM_DATA)
public String uploadFile(@RestForm("file") FileUpload file) {
return "File " + file.fileName() + " uploaded!";
}
}

public static class Mappers {

@ServerExceptionMapper(RuntimeException.class)
Uni<Response> handle() {
return Uni.createFrom().item(Response.status(200).entity("Expected failure!").build());
}

}

public static class AlwaysFailHandler implements ServerRestHandler, HandlerChainCustomizer {

@Override
public void handle(ResteasyReactiveRequestContext requestContext) throws Exception {
requestContext.suspend();
requestContext.resume(new RuntimeException("Expected exception!"), true);
}

@Override
public List<ServerRestHandler> handlers(Phase phase, ResourceClass resourceClass, ServerResourceMethod resourceMethod) {
return List.of(new AlwaysFailHandler());
}
}
}
Original file line number Diff line number Diff line change
@@ -1,18 +1,50 @@
package io.quarkus.vertx.http.runtime;

import io.quarkus.vertx.http.runtime.filters.AbstractResponseWrapper;
import io.vertx.core.Handler;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.HttpServerFileUpload;
import io.vertx.core.http.HttpServerRequest;
import io.vertx.core.http.HttpServerResponse;
import io.vertx.core.http.impl.HttpServerRequestInternal;
import io.vertx.core.http.impl.HttpServerRequestWrapper;

public class ResumingRequestWrapper extends HttpServerRequestWrapper {

private final HttpServerResponse httpServerResponse;
private boolean userSetState;

public ResumingRequestWrapper(HttpServerRequest request) {
public ResumingRequestWrapper(HttpServerRequest request, boolean mustResumeRequest) {
super((HttpServerRequestInternal) request);

// TODO: replace this when more than one response end handlers are allowed
if (mustResumeRequest) {
HttpServerResponse response = delegate.response();
response.endHandler(new Handler<Void>() {
@Override
public void handle(Void unused) {
if (!delegate.isEnded()) {
delegate.resume();
}
}
});
this.httpServerResponse = new AbstractResponseWrapper(response) {
@Override
public HttpServerResponse endHandler(Handler<Void> handler) {
return super.endHandler(new Handler<Void>() {
@Override
public void handle(Void unused) {
handler.handle(null);
if (!delegate.isEnded()) {
delegate.resume();
}
}
});
}
};
} else {
this.httpServerResponse = null;
}
}

@Override
Expand Down Expand Up @@ -71,4 +103,13 @@ public HttpServerRequest uploadHandler(Handler<HttpServerFileUpload> handler) {
}
return this;
}

@Override
public HttpServerResponse response() {
if (httpServerResponse != null) {
return httpServerResponse;
} else {
return super.response();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -528,8 +528,9 @@ public void handle(HttpServerRequest event) {
};
}

final boolean mustResumeRequest = httpConfiguration.limits.maxBodySize.isPresent();
Handler<HttpServerRequest> delegate = root;
root = HttpServerCommonHandlers.enforceDuplicatedContext(delegate);
root = HttpServerCommonHandlers.enforceDuplicatedContext(delegate, mustResumeRequest);
if (httpConfiguration.recordRequestStartTime) {
httpRouteRouter.route().order(RouteConstants.ROUTE_ORDER_RECORD_START_TIME).handler(new Handler<RoutingContext>() {
@Override
Expand Down Expand Up @@ -569,7 +570,7 @@ public void handle(RoutingContext event) {
HttpServerCommonHandlers.applyHeaders(managementConfiguration.getValue().header, mr);
applyCompression(managementBuildTimeConfig.enableCompression, mr);

Handler<HttpServerRequest> handler = HttpServerCommonHandlers.enforceDuplicatedContext(mr);
Handler<HttpServerRequest> handler = HttpServerCommonHandlers.enforceDuplicatedContext(mr, mustResumeRequest);
handler = HttpServerCommonHandlers.applyProxy(managementConfiguration.getValue().proxy, handler, vertx);

int routesBeforeMiEvent = mr.getRoutes().size();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@
import io.vertx.core.net.HostAndPort;
import io.vertx.core.streams.ReadStream;

class AbstractResponseWrapper implements HttpServerResponse {
public class AbstractResponseWrapper implements HttpServerResponse {

private final HttpServerResponse delegate;

AbstractResponseWrapper(HttpServerResponse delegate) {
protected AbstractResponseWrapper(HttpServerResponse delegate) {
this.delegate = delegate;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,8 @@ public void handle(Void e) {
}
}

public static Handler<HttpServerRequest> enforceDuplicatedContext(Handler<HttpServerRequest> delegate) {
public static Handler<HttpServerRequest> enforceDuplicatedContext(Handler<HttpServerRequest> delegate,
boolean mustResumeRequest) {
return new Handler<HttpServerRequest>() {
@Override
public void handle(HttpServerRequest event) {
Expand All @@ -78,12 +79,12 @@ public void handle(HttpServerRequest event) {
@Override
public void handle(Void x) {
setCurrentContextSafe(true);
delegate.handle(new ResumingRequestWrapper(event));
delegate.handle(new ResumingRequestWrapper(event, mustResumeRequest));
}
});
} else {
setCurrentContextSafe(true);
delegate.handle(new ResumingRequestWrapper(event));
delegate.handle(new ResumingRequestWrapper(event, mustResumeRequest));
}
}
};
Expand Down

0 comments on commit 327c66a

Please sign in to comment.