Skip to content

Commit

Permalink
remote: fix timeout for http blob store
Browse files Browse the repository at this point in the history
Fixes #5440
Closes #7209
Closes #7040

Co-authored-by: Jakob Buchgraber <buchgr@google.com>

RELNOTES: Fixed a longstanding bug in the http remote cache where the value passed to
--remote_timeout would be interpreted as milliseconds instead of seconds.
PiperOrigin-RevId: 230579161
  • Loading branch information
nicolov authored and Copybara-Service committed Jan 23, 2019
1 parent 0eee6f7 commit 285c03e
Show file tree
Hide file tree
Showing 6 changed files with 125 additions and 60 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -77,12 +77,13 @@ public final class RemoteOptions extends OptionsBase {
public String remoteCache;

@Option(
name = "remote_timeout",
defaultValue = "60",
documentationCategory = OptionDocumentationCategory.UNCATEGORIZED,
effectTags = {OptionEffectTag.UNKNOWN},
help = "The maximum number of seconds to wait for remote execution and cache calls."
)
name = "remote_timeout",
defaultValue = "60",
documentationCategory = OptionDocumentationCategory.UNCATEGORIZED,
effectTags = {OptionEffectTag.UNKNOWN},
help =
"The maximum number of seconds to wait for remote execution and cache calls. For the "
+ "REST cache, this is both the connect and the read timeout.")
public int remoteTimeout;

@Option(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import io.netty.channel.unix.DomainSocketAddress;
import java.io.IOException;
import java.net.URI;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;

/**
Expand All @@ -39,18 +38,21 @@ private SimpleBlobStoreFactory() {}
public static SimpleBlobStore createRest(RemoteOptions options, Credentials creds) {
try {
URI uri = URI.create(options.remoteHttpCache);
int timeoutMillis = (int) TimeUnit.SECONDS.toMillis(options.remoteTimeout);

if (options.remoteCacheProxy != null) {
if (options.remoteCacheProxy.startsWith("unix:")) {
return HttpBlobStore.create(
new DomainSocketAddress(options.remoteCacheProxy.replaceFirst("^unix:", "")),
uri, timeoutMillis, options.remoteMaxConnections, creds);
new DomainSocketAddress(options.remoteCacheProxy.replaceFirst("^unix:", "")),
uri,
options.remoteTimeout,
options.remoteMaxConnections,
creds);
} else {
throw new Exception("Remote cache proxy unsupported: " + options.remoteCacheProxy);
}
} else {
return HttpBlobStore.create(uri, timeoutMillis, options.remoteMaxConnections, creds);
return HttpBlobStore.create(
uri, options.remoteTimeout, options.remoteMaxConnections, creds);
}
} catch (Exception e) {
throw new RuntimeException(e);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// Copyright 2018 The Bazel Authors. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package com.google.devtools.build.lib.remote.blobstore.http;

import java.io.IOException;

class DownloadTimeoutException extends IOException {

public DownloadTimeoutException(String url, long bytesReceived, long contentLength) {
super(buildMessage(url, bytesReceived, contentLength));
}

private static String buildMessage(String url, long bytesReceived, long contentLength) {
if (contentLength < 0) {
return String.format("Download of '%s' timed out. Received %d bytes.", url, bytesReceived);
} else {
return String.format(
"Download of '%s' timed out. Received %d/%d bytes.", url, bytesReceived, contentLength);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ public final class HttpBlobStore implements SimpleBlobStore {
private final EventLoopGroup eventLoop;
private final ChannelPool channelPool;
private final URI uri;
private final int timeoutMillis;
private final int timeoutSeconds;
private final boolean useTls;

private final Object closeLock = new Object();
Expand All @@ -122,33 +122,45 @@ public final class HttpBlobStore implements SimpleBlobStore {
@GuardedBy("credentialsLock")
private long lastRefreshTime;

public static HttpBlobStore create(URI uri, int timeoutMillis,
int remoteMaxConnections, @Nullable final Credentials creds)
public static HttpBlobStore create(
URI uri, int timeoutSeconds, int remoteMaxConnections, @Nullable final Credentials creds)
throws Exception {
return new HttpBlobStore(
NioEventLoopGroup::new,
NioSocketChannel.class,
uri, timeoutMillis, remoteMaxConnections, creds,
uri,
timeoutSeconds,
remoteMaxConnections,
creds,
null);
}

public static HttpBlobStore create(
DomainSocketAddress domainSocketAddress,
URI uri, int timeoutMillis, int remoteMaxConnections, @Nullable final Credentials creds)
URI uri,
int timeoutSeconds,
int remoteMaxConnections,
@Nullable final Credentials creds)
throws Exception {

if (KQueue.isAvailable()) {
return new HttpBlobStore(
KQueueEventLoopGroup::new,
KQueueDomainSocketChannel.class,
uri, timeoutMillis, remoteMaxConnections, creds,
domainSocketAddress);
return new HttpBlobStore(
KQueueEventLoopGroup::new,
KQueueDomainSocketChannel.class,
uri,
timeoutSeconds,
remoteMaxConnections,
creds,
domainSocketAddress);
} else if (Epoll.isAvailable()) {
return new HttpBlobStore(
EpollEventLoopGroup::new,
EpollDomainSocketChannel.class,
uri, timeoutMillis, remoteMaxConnections, creds,
domainSocketAddress);
return new HttpBlobStore(
EpollEventLoopGroup::new,
EpollDomainSocketChannel.class,
uri,
timeoutSeconds,
remoteMaxConnections,
creds,
domainSocketAddress);
} else {
throw new Exception("Unix domain sockets are unsupported on this platform");
}
Expand All @@ -157,7 +169,10 @@ public static HttpBlobStore create(
private HttpBlobStore(
Function<Integer, EventLoopGroup> newEventLoopGroup,
Class<? extends Channel> channelClass,
URI uri, int timeoutMillis, int remoteMaxConnections, @Nullable final Credentials creds,
URI uri,
int timeoutSeconds,
int remoteMaxConnections,
@Nullable final Credentials creds,
@Nullable SocketAddress socketAddress)
throws Exception {
useTls = uri.getScheme().equals("https");
Expand Down Expand Up @@ -193,7 +208,7 @@ private HttpBlobStore(
Bootstrap clientBootstrap =
new Bootstrap()
.channel(channelClass)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, timeoutMillis)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 1000 * timeoutSeconds)
.group(eventLoop)
.remoteAddress(socketAddress);

Expand Down Expand Up @@ -221,7 +236,7 @@ public void channelCreated(Channel ch) {
channelPool = new SimpleChannelPool(clientBootstrap, channelPoolHandler);
}
this.creds = creds;
this.timeoutMillis = timeoutMillis;
this.timeoutSeconds = timeoutSeconds;
}

@SuppressWarnings("FutureReturnValueIgnored")
Expand Down Expand Up @@ -311,8 +326,7 @@ private Future<Channel> acquireDownloadChannel() {
return;
}

ch.pipeline()
.addFirst("read-timeout-handler", new ReadTimeoutHandler(timeoutMillis));
p.addFirst("read-timeout-handler", new ReadTimeoutHandler(timeoutSeconds));
p.addLast(new HttpClientCodec());
synchronized (credentialsLock) {
p.addLast(new HttpDownloadHandler(creds));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import io.netty.handler.codec.http.HttpUtil;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.handler.codec.http.LastHttpContent;
import io.netty.handler.timeout.ReadTimeoutException;
import io.netty.util.internal.StringUtil;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
Expand All @@ -44,6 +45,11 @@ final class HttpDownloadHandler extends AbstractHttpHandler<HttpObject> {
private boolean downloadSucceeded;
private HttpResponse response;

private long bytesReceived;
private long contentLength = -1;
/** the path header in the http request */
private String path;

public HttpDownloadHandler(Credentials credentials) {
super(credentials);
}
Expand Down Expand Up @@ -72,13 +78,18 @@ protected void channelRead0(ChannelHandlerContext ctx, HttpObject msg) throws Ex
failAndClose(error, ctx);
return;
}
if (!HttpUtil.isContentLengthSet(response) && !HttpUtil.isTransferEncodingChunked(response)) {
boolean contentLengthSet = HttpUtil.isContentLengthSet(response);
if (!contentLengthSet && !HttpUtil.isTransferEncodingChunked(response)) {
HttpException error =
new HttpException(
response, "Missing 'Content-Length' or 'Transfer-Encoding: chunked' header", null);
failAndClose(error, ctx);
return;
}

if (contentLengthSet) {
contentLength = HttpUtil.getContentLength(response);
}
downloadSucceeded = response.status().equals(HttpResponseStatus.OK);
if (!downloadSucceeded) {
out = new ByteArrayOutputStream();
Expand All @@ -90,7 +101,9 @@ protected void channelRead0(ChannelHandlerContext ctx, HttpObject msg) throws Ex
checkState(response != null, "content before headers");

ByteBuf content = ((HttpContent) msg).content();
content.readBytes(out, content.readableBytes());
int readableBytes = content.readableBytes();
content.readBytes(out, readableBytes);
bytesReceived += readableBytes;
if (msg instanceof LastHttpContent) {
if (downloadSucceeded) {
succeedAndReset(ctx);
Expand Down Expand Up @@ -118,8 +131,10 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
"Unsupported message type: " + StringUtil.simpleClassName(msg)));
return;
}
out = ((DownloadCommand) msg).out();
HttpRequest request = buildRequest((DownloadCommand) msg);
DownloadCommand cmd = (DownloadCommand) msg;
out = cmd.out();
path = constructPath(cmd.uri(), cmd.hash(), cmd.casDownload());
HttpRequest request = buildRequest(path, constructHost(cmd.uri()));
addCredentialHeaders(request, ((DownloadCommand) msg).uri());
ctx.writeAndFlush(request)
.addListener(
Expand All @@ -130,13 +145,19 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
});
}

private HttpRequest buildRequest(DownloadCommand request) {
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable t) {
if (t instanceof ReadTimeoutException) {
super.exceptionCaught(ctx, new DownloadTimeoutException(path, bytesReceived, contentLength));
} else {
super.exceptionCaught(ctx, t);
}
}

private HttpRequest buildRequest(String path, String host) {
HttpRequest httpRequest =
new DefaultFullHttpRequest(
HttpVersion.HTTP_1_1,
HttpMethod.GET,
constructPath(request.uri(), request.hash(), request.casDownload()));
httpRequest.headers().set(HttpHeaderNames.HOST, constructHost(request.uri()));
new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, path);
httpRequest.headers().set(HttpHeaderNames.HOST, host);
httpRequest.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE);
httpRequest.headers().set(HttpHeaderNames.ACCEPT, "*/*");
return httpRequest;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.HttpUtil;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.handler.timeout.ReadTimeoutException;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.File;
Expand Down Expand Up @@ -238,18 +237,19 @@ public HttpBlobStoreTest(TestServer testServer) {
this.testServer = testServer;
}

private HttpBlobStore createHttpBlobStore(ServerChannel serverChannel, int timeoutMillis,
int remoteMaxConnections, @Nullable final Credentials creds) throws Exception {
private HttpBlobStore createHttpBlobStore(
ServerChannel serverChannel, int timeoutSeconds, @Nullable final Credentials creds)
throws Exception {
SocketAddress socketAddress = serverChannel.localAddress();
if (socketAddress instanceof DomainSocketAddress) {
DomainSocketAddress domainSocketAddress = (DomainSocketAddress) socketAddress;
URI uri = new URI("http://localhost");
return HttpBlobStore.create(domainSocketAddress, uri, timeoutMillis, remoteMaxConnections,
creds);
return HttpBlobStore.create(
domainSocketAddress, uri, timeoutSeconds, /* remoteMaxConnections= */ 0, creds);
} else if (socketAddress instanceof InetSocketAddress) {
InetSocketAddress inetSocketAddress = (InetSocketAddress) socketAddress;
URI uri = new URI("http://localhost:" + inetSocketAddress.getPort());
return HttpBlobStore.create(uri, timeoutMillis, remoteMaxConnections, creds);
return HttpBlobStore.create(uri, timeoutSeconds, /* remoteMaxConnections= */ 0, creds);
} else {
throw new IllegalStateException(
"unsupported socket address class " + socketAddress.getClass());
Expand All @@ -262,14 +262,13 @@ public void timeoutShouldWork_connect() throws Exception {
testServer.stop(server);

Credentials credentials = newCredentials();
HttpBlobStore blobStore =
createHttpBlobStore(server, 5, 0, credentials);
HttpBlobStore blobStore = createHttpBlobStore(server, /* timeoutSeconds= */ 1, credentials);
getFromFuture(blobStore.get("key", new ByteArrayOutputStream()));

fail("Exception expected");
}

@Test(expected = ReadTimeoutException.class, timeout = 30000)
@Test(expected = DownloadTimeoutException.class, timeout = 30000)
public void timeoutShouldWork_read() throws Exception {
ServerChannel server = null;
try {
Expand All @@ -284,8 +283,7 @@ protected void channelRead0(
});

Credentials credentials = newCredentials();
HttpBlobStore blobStore =
createHttpBlobStore(server, 5, 0, credentials);
HttpBlobStore blobStore = createHttpBlobStore(server, /* timeoutSeconds= */ 1, credentials);
getFromFuture(blobStore.get("key", new ByteArrayOutputStream()));
fail("Exception expected");
} finally {
Expand All @@ -308,8 +306,7 @@ private void expiredAuthTokensShouldBeRetried_get(
server = testServer.start(new NotAuthorizedHandler(errorType));

Credentials credentials = newCredentials();
HttpBlobStore blobStore =
createHttpBlobStore(server, 30, 0, credentials);
HttpBlobStore blobStore = createHttpBlobStore(server, /* timeoutSeconds= */ 1, credentials);
ByteArrayOutputStream out = Mockito.spy(new ByteArrayOutputStream());
getFromFuture(blobStore.get("key", out));
assertThat(out.toString(Charsets.US_ASCII.name())).isEqualTo("File Contents");
Expand Down Expand Up @@ -339,8 +336,7 @@ private void expiredAuthTokensShouldBeRetried_put(
server = testServer.start(new NotAuthorizedHandler(errorType));

Credentials credentials = newCredentials();
HttpBlobStore blobStore =
createHttpBlobStore(server, 30, 0, credentials);
HttpBlobStore blobStore = createHttpBlobStore(server, /* timeoutSeconds= */ 1, credentials);
byte[] data = "File Contents".getBytes(Charsets.US_ASCII);
ByteArrayInputStream in = new ByteArrayInputStream(data);
blobStore.put("key", data.length, in);
Expand Down Expand Up @@ -368,8 +364,7 @@ private void errorCodeThatShouldNotBeRetried_get(
server = testServer.start(new NotAuthorizedHandler(errorType));

Credentials credentials = newCredentials();
HttpBlobStore blobStore =
createHttpBlobStore(server, 30, 0, credentials);
HttpBlobStore blobStore = createHttpBlobStore(server, /* timeoutSeconds= */ 1, credentials);
getFromFuture(blobStore.get("key", new ByteArrayOutputStream()));
fail("Exception expected.");
} catch (Exception e) {
Expand All @@ -396,8 +391,7 @@ private void errorCodeThatShouldNotBeRetried_put(
server = testServer.start(new NotAuthorizedHandler(errorType));

Credentials credentials = newCredentials();
HttpBlobStore blobStore =
createHttpBlobStore(server, 30, 0, credentials);
HttpBlobStore blobStore = createHttpBlobStore(server, /* timeoutSeconds= */ 1, credentials);
blobStore.put("key", 1, new ByteArrayInputStream(new byte[]{0}));
fail("Exception expected.");
} catch (Exception e) {
Expand Down

0 comments on commit 285c03e

Please sign in to comment.