Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Chunk streaming request bodies only #157

Merged
merged 10 commits into from
Sep 28, 2023
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,8 @@
import com.fasterxml.jackson.databind.JavaType;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -187,12 +185,10 @@ private <I> Request prepareBaseRequest(String method, String path, I in)
return new Request(method, path);
} else if (InputStream.class.isAssignableFrom(in.getClass())) {
InputStream body = (InputStream) in;
String debugBody = "<InputStream>";
return new Request(method, path, body, debugBody);
return new Request(method, path, body);
} else {
String debugBody = serialize(in);
InputStream body = new ByteArrayInputStream(debugBody.getBytes(StandardCharsets.UTF_8));
return new Request(method, path, body, debugBody);
String body = serialize(in);
return new Request(method, path, body);
}
}

Expand Down Expand Up @@ -303,11 +299,15 @@ private String makeLogRecord(Request in, Response out) {
in.getHeaders()
.forEach((header, value) -> sb.append(String.format("\n * %s: %s", header, value)));
}
String requestBody = in.getDebugBody();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This used to be <InputStream>; I've changed it to (streamed body) in the debug log.

if (requestBody != null && !requestBody.isEmpty()) {
for (String line : bodyLogger.redactedDump(requestBody).split("\n")) {
sb.append("\n> ");
sb.append(line);
if (in.isBodyStreaming()) {
sb.append("\n> (streamed body)");
} else {
String requestBody = in.getBodyString();
if (requestBody != null && !requestBody.isEmpty()) {
for (String line : bodyLogger.redactedDump(requestBody).split("\n")) {
sb.append("\n> ");
sb.append(line);
}
}
}
sb.append("\n< ");
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import com.databricks.sdk.core.http.Request;
import com.databricks.sdk.core.http.Response;
import com.databricks.sdk.core.utils.CustomCloseInputStream;
import com.databricks.sdk.mixin.ClustersExt;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
Expand All @@ -21,11 +22,15 @@
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.*;
import org.apache.http.entity.InputStreamEntity;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CommonsHttpClient implements HttpClient {
private static final Logger LOG = LoggerFactory.getLogger(ClustersExt.class);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not ClustersExt, but CommonsHttpClient

private final PoolingHttpClientConnectionManager connectionManager =
new PoolingHttpClientConnectionManager();
private final CloseableHttpClient hc;
Expand Down Expand Up @@ -117,18 +122,26 @@ private HttpUriRequest transformRequest(Request in) {
case Request.DELETE:
return new HttpDelete(in.getUri());
case Request.POST:
return withEntity(new HttpPost(in.getUri()), in.getBody());
return withEntity(new HttpPost(in.getUri()), in);
case Request.PUT:
return withEntity(new HttpPut(in.getUri()), in.getBody());
return withEntity(new HttpPut(in.getUri()), in);
case Request.PATCH:
return withEntity(new HttpPatch(in.getUri()), in.getBody());
return withEntity(new HttpPatch(in.getUri()), in);
default:
throw new IllegalArgumentException("Unknown method: " + in.getMethod());
}
}

private HttpRequestBase withEntity(HttpEntityEnclosingRequestBase request, InputStream body) {
request.setEntity(new InputStreamEntity(body));
private HttpRequestBase withEntity(HttpEntityEnclosingRequestBase request, Request in) {
if (in.isBodyString()) {
request.setEntity(new StringEntity(in.getBodyString(), StandardCharsets.UTF_8));
} else if (in.isBodyStreaming()) {
request.setEntity(new InputStreamEntity(in.getBodyStream()));
} else {
LOG.warn(
"withEntity called with a request with no body, so no request entity will be set. URI: {}",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right now, withEntity is only being called in POST / PUT and PATCH right? Is this pathway only for the PUT?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

withEntity should be called for any HTTP methods that have a request body, i.e. POST/PUT/PATCH. GET/DELETE and other methods we may support in the future like HEAD/OPTIONS don't support request bodies, so we don't use this method.

in.getUri());
}
return request;
}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package com.databricks.sdk.core.http;

import com.databricks.sdk.core.DatabricksException;
import java.io.ByteArrayInputStream;
import java.io.InputStream;
import java.io.UnsupportedEncodingException;
import java.net.URI;
Expand All @@ -19,26 +18,50 @@ public class Request {
private String url;
private final Map<String, String> headers = new HashMap<>();
private final Map<String, List<String>> query = new TreeMap<>();
private final InputStream body;
private final String debugBody;
/**
* The body of the request for requests with streaming bodies. At most one of {@link #bodyStream}
* and {@link #bodyString} can be non-null.
*/
private final InputStream bodyStream;
/**
* The body of the request for requests with string bodies. At most one of {@link #bodyStream} and
* {@link #bodyString} can be non-null.
*/
private final String bodyString;
/**
* Whether the body of the request is a streaming body. At most one of {@link #isBodyStreaming}
* and {@link #isBodyString} can be true.
*/
private final boolean isBodyStreaming;
/**
* Whether the body of the request is a string body. At most one of {@link #isBodyStreaming} and
* {@link #isBodyString} can be true.
*/
private final boolean isBodyString;

public Request(String method, String url) {
this(method, url, (String) null);
this(method, url, null, null);
}

public Request(String method, String url, String body) {
this.method = method;
this.url = url;
this.body =
body != null ? new ByteArrayInputStream(body.getBytes(StandardCharsets.UTF_8)) : null;
this.debugBody = body;
public Request(String method, String url, String bodyString) {
this(method, url, null, bodyString);
}

public Request(String method, String url, InputStream body, String debugBody) {
public Request(String method, String url, InputStream bodyStream) {
this(method, url, bodyStream, null);
}

private Request(String method, String url, InputStream bodyStream, String bodyString) {
if (bodyStream != null && bodyString != null) {
throw new IllegalArgumentException(
"At most one of bodyStream and bodyString can be non-null");
}
this.method = method;
this.url = url;
this.body = body;
this.debugBody = debugBody;
this.bodyStream = bodyStream;
this.bodyString = bodyString;
this.isBodyStreaming = bodyStream != null;
this.isBodyString = bodyString != null;
}

public Request withHeaders(Map<String, String> headers) {
Expand Down Expand Up @@ -135,12 +158,20 @@ public Map<String, List<String>> getQuery() {
return query;
}

public InputStream getBody() {
return body;
public InputStream getBodyStream() {
return bodyStream;
}

public String getBodyString() {
return bodyString;
}

public boolean isBodyStreaming() {
return isBodyStreaming;
}

public String getDebugBody() {
return debugBody;
public boolean isBodyString() {
return isBodyString;
}

@Override
Expand All @@ -151,15 +182,15 @@ public boolean equals(Object o) {
return method.equals(request.method)
&& url.equals(request.url)
&& Objects.equals(query, request.query)
&& Objects.equals(body, request.body);
&& Objects.equals(bodyStream, request.bodyStream);
}

@Override
public int hashCode() {
// Note: this is not safe for production, as debugBody will be the same for different requests
// Note: this is not safe for production, as bodyString will be null for different requests
// using InputStream.
// It is currently only used in tests.
return Objects.hash(method, url, query, debugBody);
return Objects.hash(method, url, query, bodyString);
}

@Override
Expand Down
Loading