From face255d9b0d29c631fb33fdf1f33365d734e021 Mon Sep 17 00:00:00 2001 From: Colin Alworth Date: Thu, 12 May 2022 09:30:13 -0500 Subject: [PATCH 1/5] Draft of grpc-web filter to translate to grpc --- .../jakarta/web/GrpcWebAsyncListener.java | 36 ++++ .../servlet/jakarta/web/GrpcWebFilter.java | 201 ++++++++++++++++++ .../web/GrpcWebServletOutputStream.java | 86 ++++++++ .../server/jetty/JettyBackedGrpcServer.java | 4 + 4 files changed, 327 insertions(+) create mode 100644 grpc-java/grpc-servlet-jakarta/src/main/java/io/grpc/servlet/jakarta/web/GrpcWebAsyncListener.java create mode 100644 grpc-java/grpc-servlet-jakarta/src/main/java/io/grpc/servlet/jakarta/web/GrpcWebFilter.java create mode 100644 grpc-java/grpc-servlet-jakarta/src/main/java/io/grpc/servlet/jakarta/web/GrpcWebServletOutputStream.java diff --git a/grpc-java/grpc-servlet-jakarta/src/main/java/io/grpc/servlet/jakarta/web/GrpcWebAsyncListener.java b/grpc-java/grpc-servlet-jakarta/src/main/java/io/grpc/servlet/jakarta/web/GrpcWebAsyncListener.java new file mode 100644 index 00000000000..1ccb10167c5 --- /dev/null +++ b/grpc-java/grpc-servlet-jakarta/src/main/java/io/grpc/servlet/jakarta/web/GrpcWebAsyncListener.java @@ -0,0 +1,36 @@ +package io.grpc.servlet.jakarta.web; + +import jakarta.servlet.AsyncEvent; +import jakarta.servlet.AsyncListener; +import jakarta.servlet.http.HttpServletResponseWrapper; + +import java.io.IOException; + +public class GrpcWebAsyncListener implements AsyncListener { + private final GrpcWebFilter.GrpcWebHttpResponse wrappedResponse; + + public GrpcWebAsyncListener(GrpcWebFilter.GrpcWebHttpResponse wrappedResponse) { + this.wrappedResponse = wrappedResponse; + } + + @Override + public void onComplete(AsyncEvent event) throws IOException { + // stream is ending, write trailers as headers + wrappedResponse.finish(); + } + + @Override + public void onTimeout(AsyncEvent event) throws IOException { + + } + + @Override + public void onError(AsyncEvent event) throws IOException { + + } + + @Override + public void onStartAsync(AsyncEvent event) throws IOException { + + } +} diff --git a/grpc-java/grpc-servlet-jakarta/src/main/java/io/grpc/servlet/jakarta/web/GrpcWebFilter.java b/grpc-java/grpc-servlet-jakarta/src/main/java/io/grpc/servlet/jakarta/web/GrpcWebFilter.java new file mode 100644 index 00000000000..50a8313dba7 --- /dev/null +++ b/grpc-java/grpc-servlet-jakarta/src/main/java/io/grpc/servlet/jakarta/web/GrpcWebFilter.java @@ -0,0 +1,201 @@ +package io.grpc.servlet.jakarta.web; + +import io.grpc.internal.GrpcUtil; +import jakarta.servlet.AsyncContext; +import jakarta.servlet.AsyncListener; +import jakarta.servlet.FilterChain; +import jakarta.servlet.ServletContext; +import jakarta.servlet.ServletException; +import jakarta.servlet.ServletRequest; +import jakarta.servlet.ServletResponse; +import jakarta.servlet.http.HttpFilter; +import jakarta.servlet.http.HttpServletRequest; +import jakarta.servlet.http.HttpServletRequestWrapper; +import jakarta.servlet.http.HttpServletResponse; +import jakarta.servlet.http.HttpServletResponseWrapper; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.Map; +import java.util.function.Supplier; +import java.util.regex.Pattern; + +public class GrpcWebFilter extends HttpFilter { + public static final String CONTENT_TYPE_GRPC_WEB = GrpcUtil.CONTENT_TYPE_GRPC + "-web"; + + @Override + public void doFilter(HttpServletRequest request, HttpServletResponse response, FilterChain chain) + throws IOException, ServletException { + if (isGrpcWeb(request)) { + // wrap the request and response to paper over the grpc-web details + GrpcWebHttpResponse wrappedResponse = new GrpcWebHttpResponse(response); + HttpServletRequestWrapper wrappedRequest = new HttpServletRequestWrapper(request) { + @Override + public String getContentType() { + // Adapt the content-type to replace grpc-web with grpc + return super.getContentType().replaceFirst(Pattern.quote(CONTENT_TYPE_GRPC_WEB), + GrpcUtil.CONTENT_TYPE_GRPC); + } + + @Override + public AsyncContext startAsync() throws IllegalStateException { + return super.startAsync(this, wrappedResponse); + } + + @Override + public AsyncContext startAsync(ServletRequest servletRequest, ServletResponse servletResponse) + throws IllegalStateException { + AsyncContext delegate = super.startAsync(servletRequest, servletResponse); + return new DelegatingAsyncContext(delegate) { + @Override + public void complete() { + try { + wrappedResponse.finish(); + } catch (IOException e) { + // TODO reconsider this, find a better way to report + throw new UncheckedIOException(e); + } + super.complete(); + } + }; + } + }; + + try { + chain.doFilter(wrappedRequest, wrappedResponse); + } finally { + // if (request.isAsyncStarted()) { + // request.getAsyncContext().addListener(new GrpcWebAsyncListener(wrappedResponse)); + // } + } + } else { + chain.doFilter(request, response); + } + } + + private static boolean isGrpcWeb(ServletRequest request) { + return request.getContentType() != null && request.getContentType().startsWith(CONTENT_TYPE_GRPC_WEB); + } + + // Technically we should throw away content-length too, but the impl won't care + public static class GrpcWebHttpResponse extends HttpServletResponseWrapper { + private Supplier> trailers; + private GrpcWebServletOutputStream outputStream; + + public GrpcWebHttpResponse(HttpServletResponse response) { + super(response); + } + + @Override + public void setContentType(String type) { + // Adapt the content-type to be grpc-web + super.setContentType( + type.replaceFirst(Pattern.quote(GrpcUtil.CONTENT_TYPE_GRPC), CONTENT_TYPE_GRPC_WEB)); + } + + @Override + public GrpcWebServletOutputStream getOutputStream() throws IOException { + if (outputStream == null) { + outputStream = new GrpcWebServletOutputStream(super.getOutputStream()); + } + return outputStream; + } + + @Override + public void flushBuffer() throws IOException { + super.getOutputStream().flush(); + } + + // intercept trailers and write them out as a message just before we complete + @Override + public void setTrailerFields(Supplier> supplier) { + trailers = supplier; + } + + @Override + public Supplier> getTrailerFields() { + return trailers; + } + + public void finish() throws IOException { + // write any trailers out to the output stream + getOutputStream().writeTrailers(trailers); + } + + + } + + private static class DelegatingAsyncContext implements AsyncContext { + private final AsyncContext delegate; + + private DelegatingAsyncContext(AsyncContext delegate) { + this.delegate = delegate; + } + + @Override + public ServletRequest getRequest() { + return delegate.getRequest(); + } + + @Override + public ServletResponse getResponse() { + return delegate.getResponse(); + } + + @Override + public boolean hasOriginalRequestAndResponse() { + return delegate.hasOriginalRequestAndResponse(); + } + + @Override + public void dispatch() { + delegate.dispatch(); + } + + @Override + public void dispatch(String path) { + delegate.dispatch(path); + } + + @Override + public void dispatch(ServletContext context, String path) { + delegate.dispatch(context, path); + } + + @Override + public void complete() { + delegate.complete(); + } + + @Override + public void start(Runnable run) { + delegate.start(run); + } + + @Override + public void addListener(AsyncListener listener) { + delegate.addListener(listener); + } + + @Override + public void addListener(AsyncListener listener, ServletRequest servletRequest, + ServletResponse servletResponse) { + delegate.addListener(listener, servletRequest, servletResponse); + } + + @Override + public T createListener(Class clazz) throws ServletException { + return delegate.createListener(clazz); + } + + @Override + public void setTimeout(long timeout) { + delegate.setTimeout(timeout); + } + + @Override + public long getTimeout() { + return delegate.getTimeout(); + } + } +} diff --git a/grpc-java/grpc-servlet-jakarta/src/main/java/io/grpc/servlet/jakarta/web/GrpcWebServletOutputStream.java b/grpc-java/grpc-servlet-jakarta/src/main/java/io/grpc/servlet/jakarta/web/GrpcWebServletOutputStream.java new file mode 100644 index 00000000000..7f882762090 --- /dev/null +++ b/grpc-java/grpc-servlet-jakarta/src/main/java/io/grpc/servlet/jakarta/web/GrpcWebServletOutputStream.java @@ -0,0 +1,86 @@ +package io.grpc.servlet.jakarta.web; + +import jakarta.servlet.ServletOutputStream; +import jakarta.servlet.WriteListener; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.Map; +import java.util.function.Supplier; + +public class GrpcWebServletOutputStream extends ServletOutputStream { + private final ServletOutputStream wrapped; + private boolean readyForFrame = true; + + public GrpcWebServletOutputStream(ServletOutputStream wrapped) { + this.wrapped = wrapped; + } + + @Override + public boolean isReady() { + return wrapped.isReady(); + } + + @Override + public void setWriteListener(WriteListener writeListener) { + wrapped.setWriteListener(writeListener); + } + + @Override + public void write(int i) throws IOException { + // TODO handle buffered impl too + + // intercept write and insert message framing + // if (readyForFrame) { + // wrapped.write(); + // } + wrapped.write(i); + } + + @Override + public void write(byte[] b) throws IOException { + wrapped.write(b); + } + + @Override + public void write(byte[] b, int off, int len) throws IOException { + wrapped.write(b, off, len); + } + + @Override + public void flush() throws IOException { + super.flush(); + // ready to start a new frame + readyForFrame = true; + } + + @Override + public void close() throws IOException { + super.close(); + } + + public void writeTrailers(Supplier> trailers) throws IOException { + // probably could inline this and drop the class if we don't have to frame other messages + if (trailers == null) { + return; + } + Map map = trailers.get(); + if (map == null) { + return; + } + // write a payload, even for an empty set of trailers + int trailerLength = + map.entrySet().stream().mapToInt(e -> e.getKey().length() + e.getValue().length() + 4).sum(); + ByteBuffer payload = ByteBuffer.allocate(5 + trailerLength); + payload.put((byte) 0x80); + payload.putInt(trailerLength); + for (Map.Entry entry : map.entrySet()) { + payload.put(entry.getKey().getBytes(StandardCharsets.US_ASCII)); + payload.put(": ".getBytes(StandardCharsets.US_ASCII)); + payload.put(entry.getValue().getBytes(StandardCharsets.US_ASCII)); + payload.put("\r\n".getBytes(StandardCharsets.US_ASCII)); + } + wrapped.write(payload.array()); + } +} diff --git a/server/jetty/src/main/java/io/deephaven/server/jetty/JettyBackedGrpcServer.java b/server/jetty/src/main/java/io/deephaven/server/jetty/JettyBackedGrpcServer.java index ed72f5758c3..5e91fbe9290 100644 --- a/server/jetty/src/main/java/io/deephaven/server/jetty/JettyBackedGrpcServer.java +++ b/server/jetty/src/main/java/io/deephaven/server/jetty/JettyBackedGrpcServer.java @@ -11,6 +11,7 @@ import io.deephaven.ssl.config.TrustJdk; import io.deephaven.ssl.config.impl.KickstartUtils; import io.grpc.servlet.web.websocket.WebSocketServerStream; +import io.grpc.servlet.jakarta.web.GrpcWebFilter; import jakarta.servlet.DispatcherType; import jakarta.websocket.server.ServerEndpointConfig; import nl.altindag.ssl.SSLFactory; @@ -78,6 +79,9 @@ public JettyBackedGrpcServer( // Direct jetty all use this configuration as the root application context.setContextPath("/"); + // Handle grpc-web connections, translate to vanilla grpc + context.addFilter(new FilterHolder(new GrpcWebFilter()), "/*", EnumSet.noneOf(DispatcherType.class)); + // Wire up the provided grpc filter context.addFilter(new FilterHolder(filter), "/*", EnumSet.noneOf(DispatcherType.class)); From d5c842a07d85da9aaeead597788e85bfa76ed440 Mon Sep 17 00:00:00 2001 From: Colin Alworth Date: Fri, 13 May 2022 10:29:46 -0500 Subject: [PATCH 2/5] Clean up the implementation, as this seems the simplest solution --- .../jakarta/web/DelegatingAsyncContext.java | 88 +++++++++++ .../jakarta/web/GrpcWebAsyncListener.java | 36 ----- .../servlet/jakarta/web/GrpcWebFilter.java | 139 +++++------------- .../web/GrpcWebServletOutputStream.java | 24 --- 4 files changed, 121 insertions(+), 166 deletions(-) create mode 100644 grpc-java/grpc-servlet-jakarta/src/main/java/io/grpc/servlet/jakarta/web/DelegatingAsyncContext.java delete mode 100644 grpc-java/grpc-servlet-jakarta/src/main/java/io/grpc/servlet/jakarta/web/GrpcWebAsyncListener.java diff --git a/grpc-java/grpc-servlet-jakarta/src/main/java/io/grpc/servlet/jakarta/web/DelegatingAsyncContext.java b/grpc-java/grpc-servlet-jakarta/src/main/java/io/grpc/servlet/jakarta/web/DelegatingAsyncContext.java new file mode 100644 index 00000000000..a451a26a6c5 --- /dev/null +++ b/grpc-java/grpc-servlet-jakarta/src/main/java/io/grpc/servlet/jakarta/web/DelegatingAsyncContext.java @@ -0,0 +1,88 @@ +package io.grpc.servlet.jakarta.web; + +import jakarta.servlet.AsyncContext; +import jakarta.servlet.AsyncListener; +import jakarta.servlet.ServletContext; +import jakarta.servlet.ServletException; +import jakarta.servlet.ServletRequest; +import jakarta.servlet.ServletResponse; + +/** + * Util class to allow the complete() call to get some work done (writing trailers as a payload) before + * calling the actual container implementation. The container will finish closing the stream before + * invoking the async listener and formally informing the filter that the stream has closed, making + * this our last chance to intercept the closing of the stream before it happens. + */ +public class DelegatingAsyncContext implements AsyncContext { + private final AsyncContext delegate; + + public DelegatingAsyncContext(AsyncContext delegate) { + this.delegate = delegate; + } + + @Override + public ServletRequest getRequest() { + return delegate.getRequest(); + } + + @Override + public ServletResponse getResponse() { + return delegate.getResponse(); + } + + @Override + public boolean hasOriginalRequestAndResponse() { + return delegate.hasOriginalRequestAndResponse(); + } + + @Override + public void dispatch() { + delegate.dispatch(); + } + + @Override + public void dispatch(String path) { + delegate.dispatch(path); + } + + @Override + public void dispatch(ServletContext context, String path) { + delegate.dispatch(context, path); + } + + @Override + public void complete() { + delegate.complete(); + } + + @Override + public void start(Runnable run) { + delegate.start(run); + } + + @Override + public void addListener(AsyncListener listener) { + delegate.addListener(listener); + } + + @Override + public void addListener(AsyncListener listener, ServletRequest servletRequest, + ServletResponse servletResponse) { + delegate.addListener(listener, servletRequest, servletResponse); + } + + @Override + public T createListener(Class clazz) throws ServletException { + return delegate.createListener(clazz); + } + + @Override + public void setTimeout(long timeout) { + delegate.setTimeout(timeout); + } + + @Override + public long getTimeout() { + return delegate.getTimeout(); + } +} diff --git a/grpc-java/grpc-servlet-jakarta/src/main/java/io/grpc/servlet/jakarta/web/GrpcWebAsyncListener.java b/grpc-java/grpc-servlet-jakarta/src/main/java/io/grpc/servlet/jakarta/web/GrpcWebAsyncListener.java deleted file mode 100644 index 1ccb10167c5..00000000000 --- a/grpc-java/grpc-servlet-jakarta/src/main/java/io/grpc/servlet/jakarta/web/GrpcWebAsyncListener.java +++ /dev/null @@ -1,36 +0,0 @@ -package io.grpc.servlet.jakarta.web; - -import jakarta.servlet.AsyncEvent; -import jakarta.servlet.AsyncListener; -import jakarta.servlet.http.HttpServletResponseWrapper; - -import java.io.IOException; - -public class GrpcWebAsyncListener implements AsyncListener { - private final GrpcWebFilter.GrpcWebHttpResponse wrappedResponse; - - public GrpcWebAsyncListener(GrpcWebFilter.GrpcWebHttpResponse wrappedResponse) { - this.wrappedResponse = wrappedResponse; - } - - @Override - public void onComplete(AsyncEvent event) throws IOException { - // stream is ending, write trailers as headers - wrappedResponse.finish(); - } - - @Override - public void onTimeout(AsyncEvent event) throws IOException { - - } - - @Override - public void onError(AsyncEvent event) throws IOException { - - } - - @Override - public void onStartAsync(AsyncEvent event) throws IOException { - - } -} diff --git a/grpc-java/grpc-servlet-jakarta/src/main/java/io/grpc/servlet/jakarta/web/GrpcWebFilter.java b/grpc-java/grpc-servlet-jakarta/src/main/java/io/grpc/servlet/jakarta/web/GrpcWebFilter.java index 50a8313dba7..81659aa50b1 100644 --- a/grpc-java/grpc-servlet-jakarta/src/main/java/io/grpc/servlet/jakarta/web/GrpcWebFilter.java +++ b/grpc-java/grpc-servlet-jakarta/src/main/java/io/grpc/servlet/jakarta/web/GrpcWebFilter.java @@ -2,9 +2,7 @@ import io.grpc.internal.GrpcUtil; import jakarta.servlet.AsyncContext; -import jakarta.servlet.AsyncListener; import jakarta.servlet.FilterChain; -import jakarta.servlet.ServletContext; import jakarta.servlet.ServletException; import jakarta.servlet.ServletRequest; import jakarta.servlet.ServletResponse; @@ -16,10 +14,19 @@ import java.io.IOException; import java.io.UncheckedIOException; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; import java.util.Map; import java.util.function.Supplier; import java.util.regex.Pattern; +/** + * Servlet filter that translates grpc-web on the fly to match what is expected by GrpcServlet. + * This work is done in-process with no addition copies to the request or response data - only + * the content type header and the trailer content is specially treated at this time. + * + * Note that grpc-web-text is not yet supported. + */ public class GrpcWebFilter extends HttpFilter { public static final String CONTENT_TYPE_GRPC_WEB = GrpcUtil.CONTENT_TYPE_GRPC + "-web"; @@ -39,7 +46,7 @@ public String getContentType() { @Override public AsyncContext startAsync() throws IllegalStateException { - return super.startAsync(this, wrappedResponse); + return startAsync(this, wrappedResponse); } @Override @@ -49,25 +56,40 @@ public AsyncContext startAsync(ServletRequest servletRequest, ServletResponse se return new DelegatingAsyncContext(delegate) { @Override public void complete() { + // Write any trailers out to the output stream as a payload, since grpc-web doesn't + // use proper trailers. try { - wrappedResponse.finish(); + if (wrappedResponse.trailers != null) { + Map map = wrappedResponse.trailers.get(); + if (map != null) { + // write a payload, even for an empty set of trailers, but not for + // the absence of trailers. + int trailerLength = map.entrySet().stream().mapToInt(e -> e.getKey().length() + e.getValue().length() + 4).sum(); + ByteBuffer payload = ByteBuffer.allocate(5 + trailerLength); + payload.put((byte) 0x80); + payload.putInt(trailerLength); + for (Map.Entry entry : map.entrySet()) { + payload.put(entry.getKey().getBytes(StandardCharsets.US_ASCII)); + payload.put(": ".getBytes(StandardCharsets.US_ASCII)); + payload.put(entry.getValue().getBytes(StandardCharsets.US_ASCII)); + payload.put("\r\n".getBytes(StandardCharsets.US_ASCII)); + } + wrappedResponse.getOutputStream().write(payload.array()); + } + } } catch (IOException e) { // TODO reconsider this, find a better way to report throw new UncheckedIOException(e); } + + // Let the superclass complete the stream so we formally close it super.complete(); } }; } }; - try { - chain.doFilter(wrappedRequest, wrappedResponse); - } finally { - // if (request.isAsyncStarted()) { - // request.getAsyncContext().addListener(new GrpcWebAsyncListener(wrappedResponse)); - // } - } + chain.doFilter(wrappedRequest, wrappedResponse); } else { chain.doFilter(request, response); } @@ -80,7 +102,6 @@ private static boolean isGrpcWeb(ServletRequest request) { // Technically we should throw away content-length too, but the impl won't care public static class GrpcWebHttpResponse extends HttpServletResponseWrapper { private Supplier> trailers; - private GrpcWebServletOutputStream outputStream; public GrpcWebHttpResponse(HttpServletResponse response) { super(response); @@ -93,19 +114,6 @@ public void setContentType(String type) { type.replaceFirst(Pattern.quote(GrpcUtil.CONTENT_TYPE_GRPC), CONTENT_TYPE_GRPC_WEB)); } - @Override - public GrpcWebServletOutputStream getOutputStream() throws IOException { - if (outputStream == null) { - outputStream = new GrpcWebServletOutputStream(super.getOutputStream()); - } - return outputStream; - } - - @Override - public void flushBuffer() throws IOException { - super.getOutputStream().flush(); - } - // intercept trailers and write them out as a message just before we complete @Override public void setTrailerFields(Supplier> supplier) { @@ -116,86 +124,5 @@ public void setTrailerFields(Supplier> supplier) { public Supplier> getTrailerFields() { return trailers; } - - public void finish() throws IOException { - // write any trailers out to the output stream - getOutputStream().writeTrailers(trailers); - } - - - } - - private static class DelegatingAsyncContext implements AsyncContext { - private final AsyncContext delegate; - - private DelegatingAsyncContext(AsyncContext delegate) { - this.delegate = delegate; - } - - @Override - public ServletRequest getRequest() { - return delegate.getRequest(); - } - - @Override - public ServletResponse getResponse() { - return delegate.getResponse(); - } - - @Override - public boolean hasOriginalRequestAndResponse() { - return delegate.hasOriginalRequestAndResponse(); - } - - @Override - public void dispatch() { - delegate.dispatch(); - } - - @Override - public void dispatch(String path) { - delegate.dispatch(path); - } - - @Override - public void dispatch(ServletContext context, String path) { - delegate.dispatch(context, path); - } - - @Override - public void complete() { - delegate.complete(); - } - - @Override - public void start(Runnable run) { - delegate.start(run); - } - - @Override - public void addListener(AsyncListener listener) { - delegate.addListener(listener); - } - - @Override - public void addListener(AsyncListener listener, ServletRequest servletRequest, - ServletResponse servletResponse) { - delegate.addListener(listener, servletRequest, servletResponse); - } - - @Override - public T createListener(Class clazz) throws ServletException { - return delegate.createListener(clazz); - } - - @Override - public void setTimeout(long timeout) { - delegate.setTimeout(timeout); - } - - @Override - public long getTimeout() { - return delegate.getTimeout(); - } } } diff --git a/grpc-java/grpc-servlet-jakarta/src/main/java/io/grpc/servlet/jakarta/web/GrpcWebServletOutputStream.java b/grpc-java/grpc-servlet-jakarta/src/main/java/io/grpc/servlet/jakarta/web/GrpcWebServletOutputStream.java index 7f882762090..6dcac9a4ec5 100644 --- a/grpc-java/grpc-servlet-jakarta/src/main/java/io/grpc/servlet/jakarta/web/GrpcWebServletOutputStream.java +++ b/grpc-java/grpc-servlet-jakarta/src/main/java/io/grpc/servlet/jakarta/web/GrpcWebServletOutputStream.java @@ -59,28 +59,4 @@ public void flush() throws IOException { public void close() throws IOException { super.close(); } - - public void writeTrailers(Supplier> trailers) throws IOException { - // probably could inline this and drop the class if we don't have to frame other messages - if (trailers == null) { - return; - } - Map map = trailers.get(); - if (map == null) { - return; - } - // write a payload, even for an empty set of trailers - int trailerLength = - map.entrySet().stream().mapToInt(e -> e.getKey().length() + e.getValue().length() + 4).sum(); - ByteBuffer payload = ByteBuffer.allocate(5 + trailerLength); - payload.put((byte) 0x80); - payload.putInt(trailerLength); - for (Map.Entry entry : map.entrySet()) { - payload.put(entry.getKey().getBytes(StandardCharsets.US_ASCII)); - payload.put(": ".getBytes(StandardCharsets.US_ASCII)); - payload.put(entry.getValue().getBytes(StandardCharsets.US_ASCII)); - payload.put("\r\n".getBytes(StandardCharsets.US_ASCII)); - } - wrapped.write(payload.array()); - } } From ffb63877ef080bc197d3894db96029651d0b7952 Mon Sep 17 00:00:00 2001 From: Colin Alworth Date: Tue, 2 Aug 2022 14:19:10 -0500 Subject: [PATCH 3/5] Work around eclipse/jetty.project#8405 --- .../main/java/io/grpc/servlet/jakarta/ServletAdapter.java | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/grpc-java/grpc-servlet-jakarta/src/main/java/io/grpc/servlet/jakarta/ServletAdapter.java b/grpc-java/grpc-servlet-jakarta/src/main/java/io/grpc/servlet/jakarta/ServletAdapter.java index f52afb0fd3f..7e6a992e290 100644 --- a/grpc-java/grpc-servlet-jakarta/src/main/java/io/grpc/servlet/jakarta/ServletAdapter.java +++ b/grpc-java/grpc-servlet-jakarta/src/main/java/io/grpc/servlet/jakarta/ServletAdapter.java @@ -44,6 +44,7 @@ import java.util.Enumeration; import java.util.List; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.logging.Logger; import static com.google.common.base.Preconditions.checkArgument; @@ -273,6 +274,7 @@ private static final class GrpcReadListener implements ReadListener { final AsyncContext asyncCtx; final ServletInputStream input; final InternalLogId logId; + private final AtomicBoolean closed = new AtomicBoolean(false); GrpcReadListener( ServletServerStream stream, @@ -315,6 +317,11 @@ public void onDataAvailable() throws IOException { @Override public void onAllDataRead() { logger.log(FINE, "[{0}] onAllDataRead", logId); + if (!closed.compareAndSet(false, true)) { + // https://github.com/eclipse/jetty.project/issues/8405 + logger.log(FINE, "[{0}] onAllDataRead already called, skipping this one", logId); + return; + } stream.transportState().runOnTransportThread( () -> stream.transportState().inboundDataReceived(ReadableBuffers.empty(), true)); } From 6244e864f8656c2941d6f416ed32c10b506027d3 Mon Sep 17 00:00:00 2001 From: Colin Alworth Date: Fri, 5 Aug 2022 10:08:43 -0500 Subject: [PATCH 4/5] Spotless and cleanup for review --- .../jakarta/web/DelegatingAsyncContext.java | 10 +-- .../servlet/jakarta/web/GrpcWebFilter.java | 20 +++--- .../web/GrpcWebServletOutputStream.java | 62 ------------------- 3 files changed, 18 insertions(+), 74 deletions(-) delete mode 100644 grpc-java/grpc-servlet-jakarta/src/main/java/io/grpc/servlet/jakarta/web/GrpcWebServletOutputStream.java diff --git a/grpc-java/grpc-servlet-jakarta/src/main/java/io/grpc/servlet/jakarta/web/DelegatingAsyncContext.java b/grpc-java/grpc-servlet-jakarta/src/main/java/io/grpc/servlet/jakarta/web/DelegatingAsyncContext.java index a451a26a6c5..a51577f99a8 100644 --- a/grpc-java/grpc-servlet-jakarta/src/main/java/io/grpc/servlet/jakarta/web/DelegatingAsyncContext.java +++ b/grpc-java/grpc-servlet-jakarta/src/main/java/io/grpc/servlet/jakarta/web/DelegatingAsyncContext.java @@ -8,10 +8,10 @@ import jakarta.servlet.ServletResponse; /** - * Util class to allow the complete() call to get some work done (writing trailers as a payload) before - * calling the actual container implementation. The container will finish closing the stream before - * invoking the async listener and formally informing the filter that the stream has closed, making - * this our last chance to intercept the closing of the stream before it happens. + * Util class to allow the complete() call to get some work done (writing trailers as a payload) before calling the + * actual container implementation. The container will finish closing the stream before invoking the async listener and + * formally informing the filter that the stream has closed, making this our last chance to intercept the closing of the + * stream before it happens. */ public class DelegatingAsyncContext implements AsyncContext { private final AsyncContext delegate; @@ -67,7 +67,7 @@ public void addListener(AsyncListener listener) { @Override public void addListener(AsyncListener listener, ServletRequest servletRequest, - ServletResponse servletResponse) { + ServletResponse servletResponse) { delegate.addListener(listener, servletRequest, servletResponse); } diff --git a/grpc-java/grpc-servlet-jakarta/src/main/java/io/grpc/servlet/jakarta/web/GrpcWebFilter.java b/grpc-java/grpc-servlet-jakarta/src/main/java/io/grpc/servlet/jakarta/web/GrpcWebFilter.java index 81659aa50b1..66fba1f93a9 100644 --- a/grpc-java/grpc-servlet-jakarta/src/main/java/io/grpc/servlet/jakarta/web/GrpcWebFilter.java +++ b/grpc-java/grpc-servlet-jakarta/src/main/java/io/grpc/servlet/jakarta/web/GrpcWebFilter.java @@ -13,21 +13,24 @@ import jakarta.servlet.http.HttpServletResponseWrapper; import java.io.IOException; -import java.io.UncheckedIOException; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.util.Map; import java.util.function.Supplier; +import java.util.logging.Level; +import java.util.logging.Logger; import java.util.regex.Pattern; /** - * Servlet filter that translates grpc-web on the fly to match what is expected by GrpcServlet. - * This work is done in-process with no addition copies to the request or response data - only - * the content type header and the trailer content is specially treated at this time. + * Servlet filter that translates grpc-web on the fly to match what is expected by GrpcServlet. This work is done + * in-process with no addition copies to the request or response data - only the content type header and the trailer + * content is specially treated at this time. * * Note that grpc-web-text is not yet supported. */ public class GrpcWebFilter extends HttpFilter { + private static final Logger logger = Logger.getLogger(GrpcWebFilter.class.getName()); + public static final String CONTENT_TYPE_GRPC_WEB = GrpcUtil.CONTENT_TYPE_GRPC + "-web"; @Override @@ -64,7 +67,8 @@ public void complete() { if (map != null) { // write a payload, even for an empty set of trailers, but not for // the absence of trailers. - int trailerLength = map.entrySet().stream().mapToInt(e -> e.getKey().length() + e.getValue().length() + 4).sum(); + int trailerLength = map.entrySet().stream() + .mapToInt(e -> e.getKey().length() + e.getValue().length() + 4).sum(); ByteBuffer payload = ByteBuffer.allocate(5 + trailerLength); payload.put((byte) 0x80); payload.putInt(trailerLength); @@ -78,8 +82,10 @@ public void complete() { } } } catch (IOException e) { - // TODO reconsider this, find a better way to report - throw new UncheckedIOException(e); + // complete() should not throw, but instead just log the error. In this case, + // the connection has likely been lost, so there is no way to send the trailers, + // so we just let the exception slide. + logger.log(Level.FINE, "Error sending grpc-web trailers", e); } // Let the superclass complete the stream so we formally close it diff --git a/grpc-java/grpc-servlet-jakarta/src/main/java/io/grpc/servlet/jakarta/web/GrpcWebServletOutputStream.java b/grpc-java/grpc-servlet-jakarta/src/main/java/io/grpc/servlet/jakarta/web/GrpcWebServletOutputStream.java deleted file mode 100644 index 6dcac9a4ec5..00000000000 --- a/grpc-java/grpc-servlet-jakarta/src/main/java/io/grpc/servlet/jakarta/web/GrpcWebServletOutputStream.java +++ /dev/null @@ -1,62 +0,0 @@ -package io.grpc.servlet.jakarta.web; - -import jakarta.servlet.ServletOutputStream; -import jakarta.servlet.WriteListener; - -import java.io.IOException; -import java.nio.ByteBuffer; -import java.nio.charset.StandardCharsets; -import java.util.Map; -import java.util.function.Supplier; - -public class GrpcWebServletOutputStream extends ServletOutputStream { - private final ServletOutputStream wrapped; - private boolean readyForFrame = true; - - public GrpcWebServletOutputStream(ServletOutputStream wrapped) { - this.wrapped = wrapped; - } - - @Override - public boolean isReady() { - return wrapped.isReady(); - } - - @Override - public void setWriteListener(WriteListener writeListener) { - wrapped.setWriteListener(writeListener); - } - - @Override - public void write(int i) throws IOException { - // TODO handle buffered impl too - - // intercept write and insert message framing - // if (readyForFrame) { - // wrapped.write(); - // } - wrapped.write(i); - } - - @Override - public void write(byte[] b) throws IOException { - wrapped.write(b); - } - - @Override - public void write(byte[] b, int off, int len) throws IOException { - wrapped.write(b, off, len); - } - - @Override - public void flush() throws IOException { - super.flush(); - // ready to start a new frame - readyForFrame = true; - } - - @Override - public void close() throws IOException { - super.close(); - } -} From 71abaa0e66a91dfa09c2cb28b92e92f6b24d7f6a Mon Sep 17 00:00:00 2001 From: Devin Smith Date: Mon, 8 Aug 2022 17:30:13 -0700 Subject: [PATCH 5/5] Fix io.grpc.servlet.jakarta.ServletAdapter#getHeaders for non-lowercased keys --- .../grpc/servlet/jakarta/ServletAdapter.java | 19 +++++++++---------- 1 file changed, 9 insertions(+), 10 deletions(-) diff --git a/grpc-java/grpc-servlet-jakarta/src/main/java/io/grpc/servlet/jakarta/ServletAdapter.java b/grpc-java/grpc-servlet-jakarta/src/main/java/io/grpc/servlet/jakarta/ServletAdapter.java index 7e6a992e290..99d66a624f1 100644 --- a/grpc-java/grpc-servlet-jakarta/src/main/java/io/grpc/servlet/jakarta/ServletAdapter.java +++ b/grpc-java/grpc-servlet-jakarta/src/main/java/io/grpc/servlet/jakarta/ServletAdapter.java @@ -18,7 +18,6 @@ import io.grpc.ExperimentalApi; import io.grpc.Grpc; import io.grpc.InternalLogId; -import io.grpc.InternalMetadata; import io.grpc.Metadata; import io.grpc.ServerStreamTracer; import io.grpc.Status; @@ -38,11 +37,10 @@ import java.net.InetSocketAddress; import java.net.URI; import java.net.URISyntaxException; -import java.nio.charset.StandardCharsets; -import java.util.ArrayList; import java.util.Arrays; import java.util.Enumeration; import java.util.List; +import java.util.Locale; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.logging.Logger; @@ -181,25 +179,26 @@ private static Metadata getHeaders(HttpServletRequest req) { Enumeration headerNames = req.getHeaderNames(); checkNotNull( headerNames, "Servlet container does not allow HttpServletRequest.getHeaderNames()"); - List byteArrays = new ArrayList<>(); + Metadata metadata = new Metadata(); while (headerNames.hasMoreElements()) { String headerName = headerNames.nextElement(); Enumeration values = req.getHeaders(headerName); if (values == null) { continue; } + headerName = headerName.toLowerCase(Locale.ROOT); + boolean isBinaryHeader = headerName.endsWith(Metadata.BINARY_HEADER_SUFFIX); while (values.hasMoreElements()) { String value = values.nextElement(); - if (headerName.endsWith(Metadata.BINARY_HEADER_SUFFIX)) { - byteArrays.add(headerName.getBytes(StandardCharsets.US_ASCII)); - byteArrays.add(BaseEncoding.base64().decode(value)); + if (isBinaryHeader) { + metadata.put(Metadata.Key.of(headerName, Metadata.BINARY_BYTE_MARSHALLER), + BaseEncoding.base64().decode(value)); } else { - byteArrays.add(headerName.getBytes(StandardCharsets.US_ASCII)); - byteArrays.add(value.getBytes(StandardCharsets.US_ASCII)); + metadata.put(Metadata.Key.of(headerName, Metadata.ASCII_STRING_MARSHALLER), value); } } } - return InternalMetadata.newMetadata(byteArrays.toArray(new byte[][] {})); + return metadata; } // This method must use HttpRequest#getRequestURL or HttpUtils#getRequestURL, both of which