Skip to content

Commit

Permalink
[7.4.0] Use disk cache for HTTP cache server. (#23556)
Browse files Browse the repository at this point in the history
So that we can have proper cache eviction integration tests for HTTP
cache later.

PiperOrigin-RevId: 672501366
Change-Id: I52048a4498601cfa98cc9695d5ddddc5b93f56b9
  • Loading branch information
coeuvre authored Sep 9, 2024
1 parent 293c131 commit 846802a
Show file tree
Hide file tree
Showing 11 changed files with 200 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -324,11 +324,15 @@ Path getTempPath() {

public Path toPath(Digest digest, Store store) {
String hash = digest.getHash();
return toPath(hash, store);
}

public Path toPath(String hash, Store store) {
// Create the file in a subfolder to bypass possible folder file count limits.
return storeRootMap.get(store).getChild(hash.substring(0, 2)).getChild(hash);
}

private void saveFile(Digest digest, Store store, InputStream in) throws IOException {
public void saveFile(Digest digest, Store store, InputStream in) throws IOException {
Path path = toPath(digest, store);

if (refresh(path)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
import com.google.devtools.build.lib.remote.util.TracingMetadataUtils;
import com.google.devtools.build.lib.vfs.DigestHashFunction;
import com.google.devtools.build.lib.vfs.SyscallCache;
import com.google.devtools.build.remote.worker.http.HttpCacheServerHandler;
import com.google.devtools.build.remote.worker.http.InMemoryHttpCacheServerHandler;
import com.google.devtools.common.options.Options;
import com.google.protobuf.ByteString;
import io.netty.bootstrap.ServerBootstrap;
Expand Down Expand Up @@ -359,7 +359,7 @@ public void testSpawnCheckingCacheEvent() throws Exception {
ServerChannel server = null;
try {
ConcurrentHashMap<String, byte[]> cacheContents = new ConcurrentHashMap<>();
server = testServer.start(new HttpCacheServerHandler(cacheContents));
server = testServer.start(new InMemoryHttpCacheServerHandler(cacheContents));

HttpCacheClient blobStore =
createHttpBlobStore(
Expand All @@ -385,7 +385,7 @@ public void testUpload() throws Exception {
ServerChannel server = null;
try {
ConcurrentHashMap<String, byte[]> cacheContents = new ConcurrentHashMap<>();
server = testServer.start(new HttpCacheServerHandler(cacheContents));
server = testServer.start(new InMemoryHttpCacheServerHandler(cacheContents));

HttpCacheClient blobStore =
createHttpBlobStore(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.
package com.google.devtools.build.remote.worker;

import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.devtools.build.lib.remote.util.Utils.getFromFuture;

import build.bazel.remote.execution.v2.Digest;
Expand Down Expand Up @@ -98,4 +99,8 @@ public ListenableFuture<Void> uploadFile(
RemoteActionExecutionContext context, Digest digest, Path file) {
return uploadFile(context, digest, file, /* force= */ true);
}

public DiskCacheClient getDiskCacheClient() {
return (DiskCacheClient) cacheProtocol;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
// Copyright 2024 The Bazel Authors. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.google.devtools.build.remote.worker;

import build.bazel.remote.execution.v2.Digest;
import com.google.common.io.ByteStreams;
import com.google.devtools.build.lib.remote.Store;
import com.google.devtools.build.lib.remote.util.DigestUtil;
import com.google.devtools.build.lib.vfs.Path;
import com.google.devtools.build.remote.worker.http.AbstractHttpCacheServerHandler;
import io.netty.channel.ChannelHandler.Sharable;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import javax.annotation.Nullable;

/** A simple HTTP REST disk cache used during test. */
@Sharable
public class OnDiskHttpCacheServerHandler extends AbstractHttpCacheServerHandler {
private final OnDiskBlobStoreCache cache;

public OnDiskHttpCacheServerHandler(OnDiskBlobStoreCache cache) {
this.cache = cache;
}

@Nullable
@Override
protected byte[] readFromCache(String uri) throws IOException {
var diskCache = cache.getDiskCacheClient();
Path path;
if (uri.startsWith("/ac/")) {
path = diskCache.toPath(uri.substring("/ac/".length()), Store.AC);
} else if (uri.startsWith("/cas/")) {
path = diskCache.toPath(uri.substring("/cas/".length()), Store.CAS);
} else {
throw new IOException("Invalid uri: " + uri);
}

try (var out = new ByteArrayOutputStream();
var in = path.getInputStream()) {
ByteStreams.copy(in, out);
return out.toByteArray();
}
}

@Override
protected void writeToCache(String uri, byte[] content) throws IOException {
var diskCache = cache.getDiskCacheClient();
Digest digest;
Store store;
if (uri.startsWith("/ac/")) {
digest = DigestUtil.buildDigest(uri.substring(4), content.length);
store = Store.AC;
} else if (uri.startsWith("/cas/")) {
digest = DigestUtil.buildDigest(uri.substring(5), content.length);
store = Store.CAS;
} else {
throw new IOException("Invalid uri: " + uri);
}

diskCache.saveFile(digest, store, new ByteArrayInputStream(content));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,7 @@ public static void main(String[] args) throws Exception {
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new HttpCacheServerInitializer());
.childHandler(new HttpCacheServerInitializer(new OnDiskHttpCacheServerHandler(cache)));
ch = b.bind(remoteWorkerOptions.httpListenPort).sync().channel();
logger.atInfo().log(
"Started HTTP cache server on port %d", remoteWorkerOptions.httpListenPort);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2018 The Bazel Authors. All rights reserved.
// Copyright 2024 The Bazel Authors. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand All @@ -11,16 +11,14 @@
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package com.google.devtools.build.remote.worker.http;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.flogger.GoogleLogger;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
Expand All @@ -32,28 +30,21 @@
import io.netty.handler.codec.http.HttpUtil;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.util.CharsetUtil;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.io.IOException;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import javax.annotation.Nullable;

/** A simple HTTP REST in-memory cache used during testing the LRE. */
@Sharable
public class HttpCacheServerHandler extends SimpleChannelInboundHandler<FullHttpRequest> {
/**
* An abstract HTTP REST cache that convert HTTP requests to abstract methods {@link
* #readFromCache(String)} and {@link #writeToCache(String, byte[])}.
*/
public abstract class AbstractHttpCacheServerHandler
extends SimpleChannelInboundHandler<FullHttpRequest> {
private static final GoogleLogger logger = GoogleLogger.forEnclosingClass();

private static final Pattern URI_PATTERN = Pattern.compile("^/?(.*/)?(ac/|cas/)([a-f0-9]{64})$");

private final ConcurrentMap<String, byte[]> cache;

@VisibleForTesting
public HttpCacheServerHandler(ConcurrentMap<String, byte[]> cache) {
this.cache = Preconditions.checkNotNull(cache);
}

HttpCacheServerHandler() {
this(new ConcurrentHashMap<>());
}

@Override
protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request) {
if (!request.decoderResult().isSuccess()) {
Expand Down Expand Up @@ -82,7 +73,14 @@ private void handleGet(ChannelHandlerContext ctx, FullHttpRequest request) {
return;
}

byte[] contents = cache.get(request.uri());
byte[] contents;
try {
contents = readFromCache(request.uri());
} catch (IOException e) {
logger.atSevere().withCause(e).log();
sendError(ctx, request, HttpResponseStatus.INTERNAL_SERVER_ERROR);
return;
}

if (contents == null) {
sendError(ctx, request, HttpResponseStatus.NOT_FOUND);
Expand All @@ -101,6 +99,11 @@ private void handleGet(ChannelHandlerContext ctx, FullHttpRequest request) {
}
}

@Nullable
protected abstract byte[] readFromCache(String uri) throws IOException;

protected abstract void writeToCache(String uri, byte[] content) throws IOException;

private void handlePut(ChannelHandlerContext ctx, FullHttpRequest request) {
if (!request.decoderResult().isSuccess()) {
sendError(ctx, request, HttpResponseStatus.INTERNAL_SERVER_ERROR);
Expand All @@ -113,7 +116,13 @@ private void handlePut(ChannelHandlerContext ctx, FullHttpRequest request) {

byte[] contentBytes = new byte[request.content().readableBytes()];
request.content().readBytes(contentBytes);
cache.putIfAbsent(request.uri(), contentBytes);
try {
writeToCache(request.uri(), contentBytes);
} catch (IOException e) {
logger.atSevere().withCause(e).log();
sendError(ctx, request, HttpResponseStatus.INTERNAL_SERVER_ERROR);
return;
}

FullHttpResponse response =
new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.NO_CONTENT);
Expand All @@ -126,7 +135,7 @@ private void handlePut(ChannelHandlerContext ctx, FullHttpRequest request) {

private static void sendError(
ChannelHandlerContext ctx, FullHttpRequest request, HttpResponseStatus status) {
ByteBuf data = Unpooled.copiedBuffer("Failure: " + status + "\r\n", CharsetUtil.UTF_8);
ByteBuf data = Unpooled.copiedBuffer(status.reasonPhrase() + "\r\n", CharsetUtil.UTF_8);
FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, status, data);
response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain; charset=UTF-8");
response.headers().set(HttpHeaderNames.CONTENT_LENGTH, data.readableBytes());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@ java_library(
"//src/tools/remote:__subpackages__",
],
deps = [
"//third_party:flogger",
"//third_party:guava",
"//third_party:jsr305",
"//third_party:netty",
],
)
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,11 @@

/** The initializer used by the HttpCacheServerHandler. */
public class HttpCacheServerInitializer extends ChannelInitializer<SocketChannel> {
private final HttpCacheServerHandler handler = new HttpCacheServerHandler();
private final AbstractHttpCacheServerHandler handler;

public HttpCacheServerInitializer(AbstractHttpCacheServerHandler handler) {
this.handler = handler;
}

@Override
protected void initChannel(SocketChannel ch) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
// Copyright 2018 The Bazel Authors. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package com.google.devtools.build.remote.worker.http;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import io.netty.channel.ChannelHandler.Sharable;
import java.util.concurrent.ConcurrentMap;
import javax.annotation.Nullable;

/** A simple HTTP REST in-memory cache used during testing the LRE. */
@Sharable
public class InMemoryHttpCacheServerHandler extends AbstractHttpCacheServerHandler {

private final ConcurrentMap<String, byte[]> cache;

@VisibleForTesting
public InMemoryHttpCacheServerHandler(ConcurrentMap<String, byte[]> cache) {
this.cache = Preconditions.checkNotNull(cache);
}

@Nullable
@Override
protected byte[] readFromCache(String uri) {
return cache.get(uri);
}

@Override
protected void writeToCache(String uri, byte[] content) {
cache.putIfAbsent(uri, content);
}
}
Loading

0 comments on commit 846802a

Please sign in to comment.