Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Adds a ZeroCopy response marshaller for grpc ReadObject handling #2489

Merged
merged 11 commits into from
Apr 14, 2024
8 changes: 8 additions & 0 deletions google-cloud-storage/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,14 @@
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java-util</artifactId>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-core</artifactId>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-protobuf</artifactId>
</dependency>
<dependency>
<groupId>com.google.api.grpc</groupId>
<artifactId>proto-google-common-protos</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,19 +48,23 @@ public static GapicDownloadSessionBuilder create() {
* ultimately produced channel will not do any retries of its own.
*/
public ReadableByteChannelSessionBuilder byteChannel(
ServerStreamingCallable<ReadObjectRequest, ReadObjectResponse> read) {
return new ReadableByteChannelSessionBuilder(read);
ServerStreamingCallable<ReadObjectRequest, ReadObjectResponse> read,
ResponseContentLifecycleManager responseContentLifecycleManager) {
return new ReadableByteChannelSessionBuilder(read, responseContentLifecycleManager);
}

public static final class ReadableByteChannelSessionBuilder {

private final ServerStreamingCallable<ReadObjectRequest, ReadObjectResponse> read;
private final ResponseContentLifecycleManager responseContentLifecycleManager;
private boolean autoGzipDecompression;
private Hasher hasher;

private ReadableByteChannelSessionBuilder(
ServerStreamingCallable<ReadObjectRequest, ReadObjectResponse> read) {
ServerStreamingCallable<ReadObjectRequest, ReadObjectResponse> read,
ResponseContentLifecycleManager responseContentLifecycleManager) {
this.read = read;
this.responseContentLifecycleManager = responseContentLifecycleManager;
this.hasher = Hasher.noop();
this.autoGzipDecompression = false;
}
Expand Down Expand Up @@ -100,11 +104,13 @@ public UnbufferedReadableByteChannelSessionBuilder unbuffered() {
return (object, resultFuture) -> {
if (autoGzipDecompression) {
return new GzipReadableByteChannel(
new GapicUnbufferedReadableByteChannel(resultFuture, read, object, hasher),
new GapicUnbufferedReadableByteChannel(
resultFuture, read, object, hasher, responseContentLifecycleManager),
ApiFutures.transform(
resultFuture, Object::getContentEncoding, MoreExecutors.directExecutor()));
} else {
return new GapicUnbufferedReadableByteChannel(resultFuture, read, object, hasher);
return new GapicUnbufferedReadableByteChannel(
resultFuture, read, object, hasher, responseContentLifecycleManager);
}
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,14 @@

package com.google.cloud.storage;

import static com.google.common.base.Preconditions.checkArgument;

import com.google.api.client.http.HttpStatusCodes;
import com.google.api.core.ApiFuture;
import com.google.api.core.SettableApiFuture;
import com.google.api.gax.rpc.ServerStream;
import com.google.api.gax.rpc.ServerStreamingCallable;
import com.google.cloud.storage.Crc32cValue.Crc32cLengthKnown;
import com.google.cloud.storage.UnbufferedReadableByteChannelSession.UnbufferedReadableByteChannel;
import com.google.protobuf.ByteString;
import com.google.storage.v2.ChecksummedData;
import com.google.storage.v2.Object;
import com.google.storage.v2.ReadObjectRequest;
Expand All @@ -46,25 +45,28 @@ final class GapicUnbufferedReadableByteChannel
private final ReadObjectRequest req;
private final Hasher hasher;
private final LazyServerStreamIterator iter;
private final ResponseContentLifecycleManager rclm;

private boolean open = true;
private boolean complete = false;
private long blobOffset;

private Object metadata;

private ByteBuffer leftovers;
private ResponseContentLifecycleHandle leftovers;

GapicUnbufferedReadableByteChannel(
SettableApiFuture<Object> result,
ServerStreamingCallable<ReadObjectRequest, ReadObjectResponse> read,
ReadObjectRequest req,
Hasher hasher) {
Hasher hasher,
ResponseContentLifecycleManager rclm) {
this.result = result;
this.read = read;
this.req = req;
this.hasher = hasher;
this.blobOffset = req.getReadOffset();
this.rclm = rclm;
this.iter = new LazyServerStreamIterator();
}

Expand All @@ -82,15 +84,17 @@ public long read(ByteBuffer[] dsts, int offset, int length) throws IOException {
ReadCursor c = new ReadCursor(blobOffset, blobOffset + totalBufferCapacity);
while (c.hasRemaining()) {
if (leftovers != null) {
copy(c, leftovers, dsts, offset, length);
leftovers.copy(c, dsts, offset, length);
if (!leftovers.hasRemaining()) {
leftovers.close();
leftovers = null;
}
continue;
}

if (iter.hasNext()) {
ReadObjectResponse resp = iter.next();
ResponseContentLifecycleHandle handle = rclm.get(resp);
if (resp.hasMetadata()) {
Object respMetadata = resp.getMetadata();
if (metadata == null) {
Expand All @@ -107,22 +111,24 @@ public long read(ByteBuffer[] dsts, int offset, int length) throws IOException {
}
}
ChecksummedData checksummedData = resp.getChecksummedData();
ByteBuffer content = checksummedData.getContent().asReadOnlyByteBuffer();
// very important to know whether a crc32c value is set. Without checking, protobuf will
ByteString content = checksummedData.getContent();
int contentSize = content.size();
// Very important to know whether a crc32c value is set. Without checking, protobuf will
// happily return 0, which is a valid crc32c value.
if (checksummedData.hasCrc32C()) {
Crc32cLengthKnown expected =
Crc32cValue.of(checksummedData.getCrc32C(), checksummedData.getContent().size());
Crc32cLengthKnown expected = Crc32cValue.of(checksummedData.getCrc32C(), contentSize);
try {
hasher.validate(expected, content::duplicate);
hasher.validate(expected, content.asReadOnlyByteBufferList());
} catch (IOException e) {
close();
throw e;
}
}
copy(c, content, dsts, offset, length);
if (content.hasRemaining()) {
leftovers = content;
handle.copy(c, dsts, offset, length);
if (handle.hasRemaining()) {
leftovers = handle;
} else {
handle.close();
}
} else {
complete = true;
Expand All @@ -144,59 +150,26 @@ public boolean isOpen() {
@Override
public void close() throws IOException {
open = false;
iter.close();
try {
if (leftovers != null) {
leftovers.close();
}
} finally {
iter.close();
}
}

ApiFuture<Object> getResult() {
return result;
}

private void copy(ReadCursor c, ByteBuffer content, ByteBuffer[] dsts, int offset, int length) {
long copiedBytes = Buffers.copy(content, dsts, offset, length);
c.advance(copiedBytes);
}

private IOException closeWithError(String message) throws IOException {
close();
StorageException cause =
new StorageException(HttpStatusCodes.STATUS_CODE_PRECONDITION_FAILED, message);
throw new IOException(message, cause);
}

/**
* Shrink wraps a beginning, offset and limit for tracking state of an individual invocation of
* {@link #read}
*/
private static final class ReadCursor {
private final long beginning;
private long offset;
private final long limit;

private ReadCursor(long beginning, long limit) {
this.limit = limit;
this.beginning = beginning;
this.offset = beginning;
}

public boolean hasRemaining() {
return limit - offset > 0;
}

public void advance(long incr) {
checkArgument(incr >= 0);
offset += incr;
}

public long read() {
return offset - beginning;
}

@Override
public String toString() {
return String.format("ReadCursor{begin=%d, offset=%d, limit=%d}", beginning, offset, limit);
}
}

private final class LazyServerStreamIterator implements Iterator<ReadObjectResponse>, Closeable {
private ServerStream<ReadObjectResponse> serverStream;
private Iterator<ReadObjectResponse> responseIterator;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,18 @@
final class GrpcBlobReadChannel extends BaseStorageReadChannel<Object> {

private final ServerStreamingCallable<ReadObjectRequest, ReadObjectResponse> read;
private final ResponseContentLifecycleManager responseContentLifecycleManager;
private final ReadObjectRequest request;
private final boolean autoGzipDecompression;

GrpcBlobReadChannel(
ServerStreamingCallable<ReadObjectRequest, ReadObjectResponse> read,
ResponseContentLifecycleManager responseContentLifecycleManager,
ReadObjectRequest request,
boolean autoGzipDecompression) {
super(Conversions.grpc().blobInfo());
this.read = read;
this.responseContentLifecycleManager = responseContentLifecycleManager;
this.request = request;
this.autoGzipDecompression = autoGzipDecompression;
}
Expand All @@ -53,7 +56,7 @@ protected LazyReadChannel<?, Object> newLazyReadChannel() {
ReadableByteChannelSessionBuilder b =
ResumableMedia.gapic()
.read()
.byteChannel(read)
.byteChannel(read, responseContentLifecycleManager)
.setHasher(Hasher.noop())
.setAutoGzipDecompression(autoGzipDecompression);
BufferHandle bufferHandle = getBufferHandle();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ final class GrpcStorageImpl extends BaseService<StorageOptions>
.collect(ImmutableSet.toImmutableSet())));

final StorageClient storageClient;
final ResponseContentLifecycleManager responseContentLifecycleManager;
final WriterFactory writerFactory;
final GrpcConversions codecs;
final GrpcRetryAlgorithmManager retryAlgorithmManager;
Expand All @@ -192,10 +193,12 @@ final class GrpcStorageImpl extends BaseService<StorageOptions>
GrpcStorageImpl(
GrpcStorageOptions options,
StorageClient storageClient,
ResponseContentLifecycleManager responseContentLifecycleManager,
WriterFactory writerFactory,
Opts<UserProject> defaultOpts) {
super(options);
this.storageClient = storageClient;
this.responseContentLifecycleManager = responseContentLifecycleManager;
this.writerFactory = writerFactory;
this.defaultOpts = defaultOpts;
this.codecs = Conversions.grpc();
Expand Down Expand Up @@ -716,8 +719,10 @@ public GrpcBlobReadChannel reader(BlobId blob, BlobSourceOption... options) {
ReadObjectRequest request = getReadObjectRequest(blob, opts);
Set<StatusCode.Code> codes = resultRetryAlgorithmToCodes(retryAlgorithmManager.getFor(request));
GrpcCallContext grpcCallContext = Retrying.newCallContext().withRetryableCodes(codes);

return new GrpcBlobReadChannel(
storageClient.readObjectCallable().withDefaultCallContext(grpcCallContext),
responseContentLifecycleManager,
request,
!opts.autoGzipDecompression());
}
Expand Down Expand Up @@ -1868,7 +1873,9 @@ private UnbufferedReadableByteChannelSession<Object> unbufferedReadSession(
opts.grpcMetadataMapper().apply(Retrying.newCallContext().withRetryableCodes(codes));
return ResumableMedia.gapic()
.read()
.byteChannel(storageClient.readObjectCallable().withDefaultCallContext(grpcCallContext))
.byteChannel(
storageClient.readObjectCallable().withDefaultCallContext(grpcCallContext),
responseContentLifecycleManager)
.setAutoGzipDecompression(!opts.autoGzipDecompression())
.unbuffered()
.setReadObjectRequest(readObjectRequest)
Expand Down
Loading
Loading