From 92c6ed1855c9eec9df5c037ceedc0747fa192214 Mon Sep 17 00:00:00 2001 From: Jesse Lovelace Date: Thu, 25 Jan 2024 14:11:53 -0800 Subject: [PATCH 01/11] feat: Adds a ZeroCopy response marshaller for grpc ReadObject handling --- google-cloud-storage/pom.xml | 4 + .../storage/GapicDownloadSessionBuilder.java | 16 +- .../GapicUnbufferedReadableByteChannel.java | 160 ++++++---- .../cloud/storage/GrpcBlobReadChannel.java | 5 +- .../google/cloud/storage/GrpcStorageImpl.java | 9 +- .../cloud/storage/GrpcStorageOptions.java | 274 +++++++++++++++++- .../java/com/google/cloud/storage/Hasher.java | 25 +- ...apicUnbufferedReadableByteChannelTest.java | 15 +- .../ITGzipReadableByteChannelTest.java | 22 +- .../storage/TransportCompatibilityTest.java | 4 +- .../cloud/storage/ZeroCopyMarshallerTest.java | 121 ++++++++ 11 files changed, 579 insertions(+), 76 deletions(-) create mode 100644 google-cloud-storage/src/test/java/com/google/cloud/storage/ZeroCopyMarshallerTest.java diff --git a/google-cloud-storage/pom.xml b/google-cloud-storage/pom.xml index 7fbba0aeeb..87deadffb5 100644 --- a/google-cloud-storage/pom.xml +++ b/google-cloud-storage/pom.xml @@ -96,6 +96,10 @@ com.google.protobuf protobuf-java-util + + io.grpc + grpc-protobuf + com.google.api.grpc proto-google-common-protos diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicDownloadSessionBuilder.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicDownloadSessionBuilder.java index 704794a389..82789bcd2f 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicDownloadSessionBuilder.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicDownloadSessionBuilder.java @@ -48,19 +48,23 @@ public static GapicDownloadSessionBuilder create() { * ultimately produced channel will not do any retries of its own. */ public ReadableByteChannelSessionBuilder byteChannel( - ServerStreamingCallable read) { - return new ReadableByteChannelSessionBuilder(read); + ServerStreamingCallable read, + ResponseContentLifecycleManager responseContentLifecycleManager) { + return new ReadableByteChannelSessionBuilder(read, responseContentLifecycleManager); } public static final class ReadableByteChannelSessionBuilder { private final ServerStreamingCallable read; + private final ResponseContentLifecycleManager responseContentLifecycleManager; private boolean autoGzipDecompression; private Hasher hasher; private ReadableByteChannelSessionBuilder( - ServerStreamingCallable read) { + ServerStreamingCallable read, + ResponseContentLifecycleManager responseContentLifecycleManager) { this.read = read; + this.responseContentLifecycleManager = responseContentLifecycleManager; this.hasher = Hasher.noop(); this.autoGzipDecompression = false; } @@ -100,11 +104,13 @@ public UnbufferedReadableByteChannelSessionBuilder unbuffered() { return (object, resultFuture) -> { if (autoGzipDecompression) { return new GzipReadableByteChannel( - new GapicUnbufferedReadableByteChannel(resultFuture, read, object, hasher), + new GapicUnbufferedReadableByteChannel( + resultFuture, read, object, hasher, responseContentLifecycleManager), ApiFutures.transform( resultFuture, Object::getContentEncoding, MoreExecutors.directExecutor())); } else { - return new GapicUnbufferedReadableByteChannel(resultFuture, read, object, hasher); + return new GapicUnbufferedReadableByteChannel( + resultFuture, read, object, hasher, responseContentLifecycleManager); } }; } diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicUnbufferedReadableByteChannel.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicUnbufferedReadableByteChannel.java index 4b19c3f998..c08e633d2e 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicUnbufferedReadableByteChannel.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicUnbufferedReadableByteChannel.java @@ -25,6 +25,7 @@ import com.google.api.gax.rpc.ServerStreamingCallable; import com.google.cloud.storage.Crc32cValue.Crc32cLengthKnown; import com.google.cloud.storage.UnbufferedReadableByteChannelSession.UnbufferedReadableByteChannel; +import com.google.protobuf.ByteString; import com.google.storage.v2.ChecksummedData; import com.google.storage.v2.Object; import com.google.storage.v2.ReadObjectRequest; @@ -37,6 +38,8 @@ import java.nio.channels.ScatteringByteChannel; import java.util.Arrays; import java.util.Iterator; +import java.util.List; +import org.checkerframework.checker.nullness.qual.Nullable; final class GapicUnbufferedReadableByteChannel implements UnbufferedReadableByteChannel, ScatteringByteChannel { @@ -46,6 +49,7 @@ final class GapicUnbufferedReadableByteChannel private final ReadObjectRequest req; private final Hasher hasher; private final LazyServerStreamIterator iter; + private final ResponseContentLifecycleManager rclm; private boolean open = true; private boolean complete = false; @@ -53,18 +57,20 @@ final class GapicUnbufferedReadableByteChannel private Object metadata; - private ByteBuffer leftovers; + private ResponseContentLifecycleHandle leftovers; GapicUnbufferedReadableByteChannel( SettableApiFuture result, ServerStreamingCallable read, ReadObjectRequest req, - Hasher hasher) { + Hasher hasher, + ResponseContentLifecycleManager rclm) { this.result = result; this.read = read; this.req = req; this.hasher = hasher; this.blobOffset = req.getReadOffset(); + this.rclm = rclm; this.iter = new LazyServerStreamIterator(); } @@ -82,8 +88,9 @@ public long read(ByteBuffer[] dsts, int offset, int length) throws IOException { ReadCursor c = new ReadCursor(blobOffset, blobOffset + totalBufferCapacity); while (c.hasRemaining()) { if (leftovers != null) { - copy(c, leftovers, dsts, offset, length); + leftovers.copy(c, dsts, offset, length); if (!leftovers.hasRemaining()) { + leftovers.close(); leftovers = null; } continue; @@ -91,6 +98,7 @@ public long read(ByteBuffer[] dsts, int offset, int length) throws IOException { if (iter.hasNext()) { ReadObjectResponse resp = iter.next(); + ResponseContentLifecycleHandle handle = rclm.get(resp); if (resp.hasMetadata()) { Object respMetadata = resp.getMetadata(); if (metadata == null) { @@ -107,22 +115,24 @@ public long read(ByteBuffer[] dsts, int offset, int length) throws IOException { } } ChecksummedData checksummedData = resp.getChecksummedData(); - ByteBuffer content = checksummedData.getContent().asReadOnlyByteBuffer(); - // very important to know whether a crc32c value is set. Without checking, protobuf will + ByteString content = checksummedData.getContent(); + int contentSize = content.size(); + // Very important to know whether a crc32c value is set. Without checking, protobuf will // happily return 0, which is a valid crc32c value. if (checksummedData.hasCrc32C()) { - Crc32cLengthKnown expected = - Crc32cValue.of(checksummedData.getCrc32C(), checksummedData.getContent().size()); + Crc32cLengthKnown expected = Crc32cValue.of(checksummedData.getCrc32C(), contentSize); try { - hasher.validate(expected, content::duplicate); + hasher.validate(expected, content.asReadOnlyByteBufferList()); } catch (IOException e) { close(); throw e; } } - copy(c, content, dsts, offset, length); - if (content.hasRemaining()) { - leftovers = content; + handle.copy(c, dsts, offset, length); + if (handle.hasRemaining()) { + leftovers = handle; + } else { + handle.close(); } } else { complete = true; @@ -144,18 +154,19 @@ public boolean isOpen() { @Override public void close() throws IOException { open = false; - iter.close(); + try { + if (leftovers != null) { + leftovers.close(); + } + } finally { + iter.close(); + } } ApiFuture getResult() { return result; } - private void copy(ReadCursor c, ByteBuffer content, ByteBuffer[] dsts, int offset, int length) { - long copiedBytes = Buffers.copy(content, dsts, offset, length); - c.advance(copiedBytes); - } - private IOException closeWithError(String message) throws IOException { close(); StorageException cause = @@ -163,40 +174,6 @@ private IOException closeWithError(String message) throws IOException { throw new IOException(message, cause); } - /** - * Shrink wraps a beginning, offset and limit for tracking state of an individual invocation of - * {@link #read} - */ - private static final class ReadCursor { - private final long beginning; - private long offset; - private final long limit; - - private ReadCursor(long beginning, long limit) { - this.limit = limit; - this.beginning = beginning; - this.offset = beginning; - } - - public boolean hasRemaining() { - return limit - offset > 0; - } - - public void advance(long incr) { - checkArgument(incr >= 0); - offset += incr; - } - - public long read() { - return offset - beginning; - } - - @Override - public String toString() { - return String.format("ReadCursor{begin=%d, offset=%d, limit=%d}", beginning, offset, limit); - } - } - private final class LazyServerStreamIterator implements Iterator, Closeable { private ServerStream serverStream; private Iterator responseIterator; @@ -254,3 +231,84 @@ private Iterator ensureResponseIteratorOpen() { } } } + +/** + * Shrink wraps a beginning, offset and limit for tracking state of an individual invocation of + * {@link #read} + */ +final class ReadCursor { + private final long beginning; + private long offset; + private final long limit; + + ReadCursor(long beginning, long limit) { + this.limit = limit; + this.beginning = beginning; + this.offset = beginning; + } + + public boolean hasRemaining() { + return limit - offset > 0; + } + + public void advance(long incr) { + checkArgument(incr >= 0); + offset += incr; + } + + public long read() { + return offset - beginning; + } + + @Override + public String toString() { + return String.format("ReadCursor{begin=%d, offset=%d, limit=%d}", beginning, offset, limit); + } +} + +interface ResponseContentLifecycleManager { + ResponseContentLifecycleHandle get(ReadObjectResponse response); + + static ResponseContentLifecycleManager noop() { + return response -> + new ResponseContentLifecycleHandle( + response, + () -> { + // no-op + }); + } +} + +final class ResponseContentLifecycleHandle implements Closeable { + @Nullable private final Closeable dispose; + + private final List buffers; + + ResponseContentLifecycleHandle(ReadObjectResponse resp, @Nullable Closeable dispose) { + this.dispose = dispose; + + this.buffers = resp.getChecksummedData().getContent().asReadOnlyByteBufferList(); + } + + void copy(ReadCursor c, ByteBuffer[] dsts, int offset, int length) { + for (ByteBuffer b : buffers) { + long copiedBytes = Buffers.copy(b, dsts, offset, length); + c.advance(copiedBytes); + if (b.hasRemaining()) break; + } + } + + boolean hasRemaining() { + for (ByteBuffer b : buffers) { + if (b.hasRemaining()) return true; + } + return false; + } + + @Override + public void close() throws IOException { + if (dispose != null) { + dispose.close(); + } + } +} diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcBlobReadChannel.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcBlobReadChannel.java index 4ae3f24466..03e3cb517a 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcBlobReadChannel.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcBlobReadChannel.java @@ -28,15 +28,18 @@ final class GrpcBlobReadChannel extends BaseStorageReadChannel { private final ServerStreamingCallable read; + private final ResponseContentLifecycleManager responseContentLifecycleManager; private final ReadObjectRequest request; private final boolean autoGzipDecompression; GrpcBlobReadChannel( ServerStreamingCallable read, + ResponseContentLifecycleManager responseContentLifecycleManager, ReadObjectRequest request, boolean autoGzipDecompression) { super(Conversions.grpc().blobInfo()); this.read = read; + this.responseContentLifecycleManager = responseContentLifecycleManager; this.request = request; this.autoGzipDecompression = autoGzipDecompression; } @@ -53,7 +56,7 @@ protected LazyReadChannel newLazyReadChannel() { ReadableByteChannelSessionBuilder b = ResumableMedia.gapic() .read() - .byteChannel(read) + .byteChannel(read, responseContentLifecycleManager) .setHasher(Hasher.noop()) .setAutoGzipDecompression(autoGzipDecompression); BufferHandle bufferHandle = getBufferHandle(); diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcStorageImpl.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcStorageImpl.java index e9857e93d5..e5e81c060e 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcStorageImpl.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcStorageImpl.java @@ -180,6 +180,7 @@ final class GrpcStorageImpl extends BaseService .collect(ImmutableSet.toImmutableSet()))); final StorageClient storageClient; + final ResponseContentLifecycleManager responseContentLifecycleManager; final WriterFactory writerFactory; final GrpcConversions codecs; final GrpcRetryAlgorithmManager retryAlgorithmManager; @@ -192,10 +193,12 @@ final class GrpcStorageImpl extends BaseService GrpcStorageImpl( GrpcStorageOptions options, StorageClient storageClient, + ResponseContentLifecycleManager responseContentLifecycleManager, WriterFactory writerFactory, Opts defaultOpts) { super(options); this.storageClient = storageClient; + this.responseContentLifecycleManager = responseContentLifecycleManager; this.writerFactory = writerFactory; this.defaultOpts = defaultOpts; this.codecs = Conversions.grpc(); @@ -716,8 +719,10 @@ public GrpcBlobReadChannel reader(BlobId blob, BlobSourceOption... options) { ReadObjectRequest request = getReadObjectRequest(blob, opts); Set codes = resultRetryAlgorithmToCodes(retryAlgorithmManager.getFor(request)); GrpcCallContext grpcCallContext = Retrying.newCallContext().withRetryableCodes(codes); + return new GrpcBlobReadChannel( storageClient.readObjectCallable().withDefaultCallContext(grpcCallContext), + responseContentLifecycleManager, request, !opts.autoGzipDecompression()); } @@ -1868,7 +1873,9 @@ private UnbufferedReadableByteChannelSession unbufferedReadSession( opts.grpcMetadataMapper().apply(Retrying.newCallContext().withRetryableCodes(codes)); return ResumableMedia.gapic() .read() - .byteChannel(storageClient.readObjectCallable().withDefaultCallContext(grpcCallContext)) + .byteChannel( + storageClient.readObjectCallable().withDefaultCallContext(grpcCallContext), + responseContentLifecycleManager) .setAutoGzipDecompression(!opts.autoGzipDecompression()) .unbuffered() .setReadObjectRequest(readObjectRequest) diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcStorageOptions.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcStorageOptions.java index 841343d311..ed03e553aa 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcStorageOptions.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcStorageOptions.java @@ -25,14 +25,20 @@ import com.google.api.gax.core.CredentialsProvider; import com.google.api.gax.core.FixedCredentialsProvider; import com.google.api.gax.core.NoCredentialsProvider; +import com.google.api.gax.grpc.GrpcCallSettings; import com.google.api.gax.grpc.GrpcInterceptorProvider; +import com.google.api.gax.grpc.GrpcStubCallableFactory; import com.google.api.gax.grpc.InstantiatingGrpcChannelProvider; import com.google.api.gax.retrying.RetrySettings; import com.google.api.gax.retrying.StreamResumptionStrategy; +import com.google.api.gax.rpc.ClientContext; import com.google.api.gax.rpc.HeaderProvider; import com.google.api.gax.rpc.NoHeaderProvider; +import com.google.api.gax.rpc.RequestParamsBuilder; +import com.google.api.gax.rpc.ServerStreamingCallable; import com.google.api.gax.rpc.StatusCode.Code; import com.google.api.gax.rpc.internal.QuotaProjectIdHidingCredentials; +import com.google.api.pathtemplate.PathTemplate; import com.google.auth.Credentials; import com.google.cloud.NoCredentials; import com.google.cloud.ServiceFactory; @@ -47,21 +53,45 @@ import com.google.cloud.storage.UnifiedOpts.Opts; import com.google.cloud.storage.UnifiedOpts.UserProject; import com.google.cloud.storage.spi.StorageRpcFactory; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.MoreObjects; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; +import com.google.protobuf.ByteString; +import com.google.protobuf.CodedInputStream; +import com.google.protobuf.InvalidProtocolBufferException; +import com.google.protobuf.MessageLite; +import com.google.protobuf.Parser; +import com.google.protobuf.UnsafeByteOperations; import com.google.storage.v2.ReadObjectRequest; import com.google.storage.v2.ReadObjectResponse; import com.google.storage.v2.StorageClient; import com.google.storage.v2.StorageSettings; +import com.google.storage.v2.stub.GrpcStorageCallableFactory; +import com.google.storage.v2.stub.GrpcStorageStub; +import com.google.storage.v2.stub.StorageStub; +import com.google.storage.v2.stub.StorageStubSettings; import io.grpc.ClientInterceptor; +import io.grpc.Detachable; +import io.grpc.HasByteBuffer; +import io.grpc.KnownLength; import io.grpc.ManagedChannelBuilder; +import io.grpc.MethodDescriptor; +import io.grpc.Status; +import io.grpc.protobuf.ProtoUtils; +import java.io.Closeable; import java.io.IOException; +import java.io.InputStream; import java.io.Serializable; +import java.io.UncheckedIOException; import java.net.URI; +import java.nio.ByteBuffer; import java.time.Clock; +import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; +import java.util.IdentityHashMap; import java.util.List; import java.util.Locale; import java.util.Map; @@ -658,11 +688,33 @@ public Storage create(StorageOptions options) { Tuple> t = grpcStorageOptions.resolveSettingsAndOpts(); StorageSettings storageSettings = t.x(); Opts defaultOpts = t.y(); - return new GrpcStorageImpl( - grpcStorageOptions, - StorageClient.create(storageSettings), - grpcStorageOptions.blobWriteSessionConfig.createFactory(Clock.systemUTC()), - defaultOpts); + StorageClient client; + if (ZeroCopyReadinessChecker.isReady()) { + StorageStubSettings stubSettings = + (StorageStubSettings) storageSettings.getStubSettings(); + ClientContext clientContext = ClientContext.create(stubSettings); + GrpcStorageCallableFactory grpcStorageCallableFactory = + new GrpcStorageCallableFactory(); + InternalZeroCopyGrpcStorageStub stub = + new InternalZeroCopyGrpcStorageStub( + stubSettings, clientContext, grpcStorageCallableFactory); + client = new InternalStorageClient(stub); + return new GrpcStorageImpl( + grpcStorageOptions, + client, + stub.getObjectMediaResponseMarshaller, + grpcStorageOptions.blobWriteSessionConfig.createFactory(Clock.systemUTC()), + defaultOpts); + } else { + client = StorageClient.create(storageSettings); + return new GrpcStorageImpl( + grpcStorageOptions, + client, + ResponseContentLifecycleManager.noop(), + grpcStorageOptions.blobWriteSessionConfig.createFactory(Clock.systemUTC()), + defaultOpts); + } + } catch (IOException e) { throw new IllegalStateException( "Unable to instantiate gRPC com.google.cloud.storage.Storage client.", e); @@ -780,4 +832,216 @@ private Object readResolve() { return INSTANCE; } } + + private static final class InternalStorageClient extends StorageClient { + + private InternalStorageClient(StorageStub stub) { + super(stub); + } + } + + private static final class InternalZeroCopyGrpcStorageStub extends GrpcStorageStub + implements AutoCloseable { + private final ReadObjectResponseZeroCopyMessageMarshaller getObjectMediaResponseMarshaller; + + private final ServerStreamingCallable + serverStreamingCallable; + + private InternalZeroCopyGrpcStorageStub( + StorageStubSettings settings, + ClientContext clientContext, + GrpcStubCallableFactory callableFactory) + throws IOException { + super(settings, clientContext, callableFactory); + + this.getObjectMediaResponseMarshaller = + new ReadObjectResponseZeroCopyMessageMarshaller(ReadObjectResponse.getDefaultInstance()); + + MethodDescriptor readObjectMethodDescriptor = + MethodDescriptor.newBuilder() + .setType(MethodDescriptor.MethodType.SERVER_STREAMING) + .setFullMethodName("google.storage.v2.Storage/ReadObject") + .setRequestMarshaller(ProtoUtils.marshaller(ReadObjectRequest.getDefaultInstance())) + .setResponseMarshaller(getObjectMediaResponseMarshaller) + .build(); + + GrpcCallSettings readObjectTransportSettings = + GrpcCallSettings.newBuilder() + .setMethodDescriptor(readObjectMethodDescriptor) + .setParamsExtractor( + request -> { + RequestParamsBuilder builder = RequestParamsBuilder.create(); + // todo: this is fragile to proto annotation changes, and would require manual + // maintenance + builder.add(request.getBucket(), "bucket", PathTemplate.create("{bucket=**}")); + return builder.build(); + }) + .build(); + + this.serverStreamingCallable = + callableFactory.createServerStreamingCallable( + readObjectTransportSettings, settings.readObjectSettings(), clientContext); + } + + @Override + public ServerStreamingCallable readObjectCallable() { + return serverStreamingCallable; + } + } + + @VisibleForTesting + static class ReadObjectResponseZeroCopyMessageMarshaller + implements MethodDescriptor.PrototypeMarshaller, + ResponseContentLifecycleManager, + Closeable { + private final Map unclosedStreams; + private final Parser parser; + private final MethodDescriptor.PrototypeMarshaller baseMarshaller; + + ReadObjectResponseZeroCopyMessageMarshaller(ReadObjectResponse defaultInstance) { + parser = defaultInstance.getParserForType(); + baseMarshaller = + (MethodDescriptor.PrototypeMarshaller) + ProtoUtils.marshaller(defaultInstance); + unclosedStreams = Collections.synchronizedMap(new IdentityHashMap<>()); + } + + @Override + public Class getMessageClass() { + return baseMarshaller.getMessageClass(); + } + + @Override + public ReadObjectResponse getMessagePrototype() { + return baseMarshaller.getMessagePrototype(); + } + + @Override + public InputStream stream(ReadObjectResponse value) { + return baseMarshaller.stream(value); + } + + @Override + public ReadObjectResponse parse(InputStream stream) { + CodedInputStream cis = null; + try { + if (stream instanceof KnownLength + && stream instanceof Detachable + && stream instanceof HasByteBuffer + && ((HasByteBuffer) stream).byteBufferSupported()) { + int size = stream.available(); + // Stream is now detached here and should be closed later. + stream = ((Detachable) stream).detach(); + // This mark call is to keep buffer while traversing buffers using skip. + stream.mark(size); + List byteStrings = new ArrayList<>(); + while (stream.available() != 0) { + ByteBuffer buffer = ((HasByteBuffer) stream).getByteBuffer(); + byteStrings.add(UnsafeByteOperations.unsafeWrap(buffer)); + stream.skip(buffer.remaining()); + } + stream.reset(); + cis = ByteString.copyFrom(byteStrings).newCodedInput(); + cis.enableAliasing(true); + cis.setSizeLimit(Integer.MAX_VALUE); + } + } catch (IOException e) { + throw Status.INTERNAL + .withDescription("Error parsing input stream for ReadObject") + .withCause(e) + .asRuntimeException(); + } + if (cis != null) { + // fast path (no memory copy) + ReadObjectResponse message; + try { + message = parseFrom(cis); + } catch (InvalidProtocolBufferException ipbe) { + throw Status.INTERNAL + .withDescription("Invalid protobuf byte sequence for ReadObject") + .withCause(ipbe) + .asRuntimeException(); + } + unclosedStreams.put(message, stream); + return message; + } else { + // slow path + return baseMarshaller.parse(stream); + } + } + + private ReadObjectResponse parseFrom(CodedInputStream stream) + throws InvalidProtocolBufferException { + ReadObjectResponse message = parser.parseFrom(stream); + try { + stream.checkLastTagWas(0); + return message; + } catch (InvalidProtocolBufferException e) { + e.setUnfinishedMessage(message); + throw e; + } + } + + @Override + public ResponseContentLifecycleHandle get(ReadObjectResponse response) { + InputStream stream = unclosedStreams.remove(response); + return new ResponseContentLifecycleHandle(response, stream); + } + + @Override + public void close() throws IOException { + unclosedStreams + .values() + .forEach( + s -> { + try { + s.close(); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + }); + } + } + + static class ZeroCopyReadinessChecker { + private static final boolean isZeroCopyReady; + + static { + // Check whether io.grpc.Detachable exists? + boolean detachableClassExists = false; + try { + // Try to load Detachable interface in the package where KnownLength is in. + // This can be done directly by looking up io.grpc.Detachable but rather + // done indirectly to handle the case where gRPC is being shaded in a + // different package. + String knownLengthClassName = KnownLength.class.getName(); + String detachableClassName = + knownLengthClassName.substring(0, knownLengthClassName.lastIndexOf('.') + 1) + + "Detachable"; + Class detachableClass = Class.forName(detachableClassName); + detachableClassExists = (detachableClass != null); + } catch (ClassNotFoundException ex) { + // Should cause false to be returned + } + // Check whether com.google.protobuf.UnsafeByteOperations exists? + boolean unsafeByteOperationsClassExists = false; + try { + // Same above + String messageLiteClassName = MessageLite.class.getName(); + String unsafeByteOperationsClassName = + messageLiteClassName.substring(0, messageLiteClassName.lastIndexOf('.') + 1) + + "UnsafeByteOperations"; + System.out.println(unsafeByteOperationsClassName); + Class unsafeByteOperationsClass = Class.forName(unsafeByteOperationsClassName); + unsafeByteOperationsClassExists = (unsafeByteOperationsClass != null); + } catch (ClassNotFoundException ex) { + // Should cause false to be returned + } + isZeroCopyReady = detachableClassExists && unsafeByteOperationsClassExists; + } + + public static boolean isReady() { + return isZeroCopyReady; + } + } } diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/Hasher.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/Hasher.java index 06fca0413f..d7741b7d91 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/Hasher.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/Hasher.java @@ -20,6 +20,7 @@ import com.google.common.hash.Hashing; import java.io.IOException; import java.nio.ByteBuffer; +import java.util.List; import java.util.function.Supplier; import javax.annotation.concurrent.Immutable; import org.checkerframework.checker.nullness.qual.Nullable; @@ -36,6 +37,8 @@ default Crc32cLengthKnown hash(Supplier b) { void validate(Crc32cValue expected, Supplier b) throws IOException; + void validate(Crc32cValue expected, List buffers) throws IOException; + @Nullable Crc32cLengthKnown nullSafeConcat(Crc32cLengthKnown r1, Crc32cLengthKnown r2); @@ -61,6 +64,9 @@ public Crc32cLengthKnown hash(ByteBuffer b) { @Override public void validate(Crc32cValue expected, Supplier b) {} + @Override + public void validate(Crc32cValue expected, List b) {} + @Override public @Nullable Crc32cLengthKnown nullSafeConcat(Crc32cLengthKnown r1, Crc32cLengthKnown r2) { return null; @@ -79,7 +85,24 @@ public Crc32cLengthKnown hash(ByteBuffer b) { return Crc32cValue.of(Hashing.crc32c().hashBytes(b).asInt(), remaining); } - @SuppressWarnings("ConstantConditions") + @SuppressWarnings({"ConstantConditions", "UnstableApiUsage"}) + @Override + public void validate(Crc32cValue expected, List b) throws IOException { + long remaining = 0; + com.google.common.hash.Hasher crc32c = Hashing.crc32c().newHasher(); + for (ByteBuffer tmp : b) { + remaining += tmp.remaining(); + crc32c.putBytes(tmp); + } + Crc32cLengthKnown actual = Crc32cValue.of(crc32c.hash().asInt(), remaining); + if (!actual.eqValue(expected)) { + throw new IOException( + String.format( + "Mismatch checksum value. Expected %s actual %s", + expected.debugString(), actual.debugString())); + } + } + @Override public void validate(Crc32cValue expected, Supplier b) throws IOException { Crc32cLengthKnown actual = hash(b); diff --git a/google-cloud-storage/src/test/java/com/google/cloud/storage/ITGapicUnbufferedReadableByteChannelTest.java b/google-cloud-storage/src/test/java/com/google/cloud/storage/ITGapicUnbufferedReadableByteChannelTest.java index ba9a972cc2..3a4fd7e924 100644 --- a/google-cloud-storage/src/test/java/com/google/cloud/storage/ITGapicUnbufferedReadableByteChannelTest.java +++ b/google-cloud-storage/src/test/java/com/google/cloud/storage/ITGapicUnbufferedReadableByteChannelTest.java @@ -127,7 +127,8 @@ public void readRetriesAreProperlyOrdered_readLargerThanMessageSize() .withDefaultCallContext( contextWithRetryForCodes(StatusCode.Code.DATA_LOSS)), start, - Hasher.noop())); + Hasher.noop(), + ResponseContentLifecycleManager.noop())); byte[] actualBytes = new byte[40]; try (UnbufferedReadableByteChannel c = session.open()) { c.read(ByteBuffer.wrap(actualBytes)); @@ -155,7 +156,8 @@ public void readRetriesAreProperlyOrdered_readSmallerThanMessageSize() .withDefaultCallContext( contextWithRetryForCodes(StatusCode.Code.DATA_LOSS)), start, - Hasher.noop())); + Hasher.noop(), + ResponseContentLifecycleManager.noop())); byte[] actualBytes = new byte[40]; ImmutableList buffers = TestUtils.subDivide(actualBytes, 2); try (UnbufferedReadableByteChannel c = session.open()) { @@ -213,7 +215,8 @@ public void readObject( .withDefaultCallContext( contextWithRetryForCodes(StatusCode.Code.DATA_LOSS)), start, - Hasher.noop())); + Hasher.noop(), + ResponseContentLifecycleManager.noop())); byte[] actualBytes = new byte[40]; try (UnbufferedReadableByteChannel c = session.open()) { IOException ioException = @@ -260,7 +263,8 @@ public void readObject( .withDefaultCallContext( contextWithRetryForCodes(StatusCode.Code.DATA_LOSS)), start, - Hasher.enabled())); + Hasher.enabled(), + ResponseContentLifecycleManager.noop())); byte[] actualBytes = new byte[40]; try (UnbufferedReadableByteChannel c = session.open()) { IOException ioException = @@ -299,7 +303,8 @@ public void readObject( .withDefaultCallContext( contextWithRetryForCodes(StatusCode.Code.DATA_LOSS)), start, - Hasher.enabled())); + Hasher.enabled(), + ResponseContentLifecycleManager.noop())); byte[] actualBytes = new byte[41]; //noinspection resource UnbufferedReadableByteChannel c = session.open(); diff --git a/google-cloud-storage/src/test/java/com/google/cloud/storage/ITGzipReadableByteChannelTest.java b/google-cloud-storage/src/test/java/com/google/cloud/storage/ITGzipReadableByteChannelTest.java index 2e8efa0589..41f6df1de4 100644 --- a/google-cloud-storage/src/test/java/com/google/cloud/storage/ITGzipReadableByteChannelTest.java +++ b/google-cloud-storage/src/test/java/com/google/cloud/storage/ITGzipReadableByteChannelTest.java @@ -124,7 +124,9 @@ public void autoGzipDecompress_true() throws IOException { UnbufferedReadableByteChannelSession session = ResumableMedia.gapic() .read() - .byteChannel(storageClient.getInstance().readObjectCallable()) + .byteChannel( + storageClient.getInstance().readObjectCallable(), + ResponseContentLifecycleManager.noop()) .setHasher(Hasher.noop()) .setAutoGzipDecompression(true) .unbuffered() @@ -143,7 +145,9 @@ public void autoGzipDecompress_false() throws IOException { UnbufferedReadableByteChannelSession session = ResumableMedia.gapic() .read() - .byteChannel(storageClient.getInstance().readObjectCallable()) + .byteChannel( + storageClient.getInstance().readObjectCallable(), + ResponseContentLifecycleManager.noop()) .setHasher(Hasher.noop()) .setAutoGzipDecompression(false) .unbuffered() @@ -193,7 +197,9 @@ public void autoGzipDecompress_true() throws IOException { UnbufferedReadableByteChannelSession session = ResumableMedia.gapic() .read() - .byteChannel(storageClient.getInstance().readObjectCallable()) + .byteChannel( + storageClient.getInstance().readObjectCallable(), + ResponseContentLifecycleManager.noop()) .setHasher(Hasher.noop()) .setAutoGzipDecompression(true) .unbuffered() @@ -212,7 +218,9 @@ public void autoGzipDecompress_false() throws IOException { UnbufferedReadableByteChannelSession session = ResumableMedia.gapic() .read() - .byteChannel(storageClient.getInstance().readObjectCallable()) + .byteChannel( + storageClient.getInstance().readObjectCallable(), + ResponseContentLifecycleManager.noop()) .setHasher(Hasher.noop()) .setAutoGzipDecompression(false) .unbuffered() @@ -231,7 +239,9 @@ public void autoGzipDecompress_default_disabled() throws IOException { UnbufferedReadableByteChannelSession session = ResumableMedia.gapic() .read() - .byteChannel(storageClient.getInstance().readObjectCallable()) + .byteChannel( + storageClient.getInstance().readObjectCallable(), + ResponseContentLifecycleManager.noop()) .setHasher(Hasher.noop()) .unbuffered() .setReadObjectRequest(reqCompressed) @@ -314,7 +324,7 @@ public void readObject( ReadableByteChannelSession session = ResumableMedia.gapic() .read() - .byteChannel(sc.readObjectCallable()) + .byteChannel(sc.readObjectCallable(), ResponseContentLifecycleManager.noop()) .setHasher(Hasher.noop()) .setAutoGzipDecompression(true) .unbuffered() diff --git a/google-cloud-storage/src/test/java/com/google/cloud/storage/TransportCompatibilityTest.java b/google-cloud-storage/src/test/java/com/google/cloud/storage/TransportCompatibilityTest.java index 76219beccd..32d308a311 100644 --- a/google-cloud-storage/src/test/java/com/google/cloud/storage/TransportCompatibilityTest.java +++ b/google-cloud-storage/src/test/java/com/google/cloud/storage/TransportCompatibilityTest.java @@ -38,7 +38,9 @@ public void verifyUnsupportedMethodsGenerateMeaningfulException() { .setCredentials(NoCredentials.getInstance()) .build(); @SuppressWarnings("resource") - Storage s = new GrpcStorageImpl(options, null, null, Opts.empty()); + Storage s = + new GrpcStorageImpl( + options, null, ResponseContentLifecycleManager.noop(), null, Opts.empty()); ImmutableList messages = Stream.>of( s::batch, diff --git a/google-cloud-storage/src/test/java/com/google/cloud/storage/ZeroCopyMarshallerTest.java b/google-cloud-storage/src/test/java/com/google/cloud/storage/ZeroCopyMarshallerTest.java new file mode 100644 index 0000000000..88c5b674c4 --- /dev/null +++ b/google-cloud-storage/src/test/java/com/google/cloud/storage/ZeroCopyMarshallerTest.java @@ -0,0 +1,121 @@ +package com.google.cloud.storage; + +import static com.google.cloud.storage.TestUtils.getChecksummedData; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertThrows; +import static org.junit.Assert.assertTrue; + +import com.google.cloud.storage.GrpcStorageOptions.ReadObjectResponseZeroCopyMessageMarshaller; +import com.google.common.hash.Hashing; +import com.google.protobuf.ByteString; +import com.google.storage.v2.ContentRange; +import com.google.storage.v2.Object; +import com.google.storage.v2.ObjectChecksums; +import com.google.storage.v2.ReadObjectResponse; +import io.grpc.StatusRuntimeException; +import io.grpc.internal.ReadableBuffer; +import io.grpc.internal.ReadableBuffers; +import java.io.Closeable; +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.concurrent.atomic.AtomicBoolean; +import org.junit.Test; + +public class ZeroCopyMarshallerTest { + private final byte[] bytes = DataGenerator.base64Characters().genBytes(40); + private final ByteString data = ByteString.copyFrom(bytes, 0, 10); + private final ReadObjectResponse RESPONSE = + ReadObjectResponse.newBuilder() + .setMetadata( + Object.newBuilder() + .setName("name") + .setGeneration(3L) + .setContentType("application/octet-stream") + .build()) + .setContentRange(ContentRange.newBuilder().setStart(0).build()) + .setObjectChecksums( + ObjectChecksums.newBuilder().setCrc32C(Hashing.crc32c().hashBytes(bytes).asInt())) + .setChecksummedData(getChecksummedData(data, Hasher.enabled())) + .build(); + + private ReadObjectResponseZeroCopyMessageMarshaller createMarshaller() { + return new ReadObjectResponseZeroCopyMessageMarshaller(ReadObjectResponse.getDefaultInstance()); + } + + private byte[] dropLastOneByte(byte[] bytes) { + return Arrays.copyOfRange(bytes, 0, bytes.length - 1); + } + + private InputStream createInputStream(byte[] bytes, boolean isZeroCopyable) { + ReadableBuffer buffer = + isZeroCopyable ? ReadableBuffers.wrap(ByteBuffer.wrap(bytes)) : ReadableBuffers.wrap(bytes); + return ReadableBuffers.openStream(buffer, true); + } + + @Test + public void testParseOnFastPath() throws IOException { + InputStream stream = createInputStream(RESPONSE.toByteArray(), true); + ReadObjectResponseZeroCopyMessageMarshaller marshaller = createMarshaller(); + ReadObjectResponse response = marshaller.parse(stream); + assertEquals(response, RESPONSE); + ResponseContentLifecycleHandle stream2 = marshaller.get(response); + assertNotNull(stream2); + stream2.close(); + ResponseContentLifecycleHandle stream3 = marshaller.get(response); + assertNotNull(stream3); + stream3.close(); + } + + @Test + public void testParseOnSlowPath() throws IOException { + InputStream stream = createInputStream(RESPONSE.toByteArray(), false); + ReadObjectResponseZeroCopyMessageMarshaller marshaller = createMarshaller(); + ReadObjectResponse response = marshaller.parse(stream); + assertEquals(response, RESPONSE); + ResponseContentLifecycleHandle stream2 = marshaller.get(response); + assertNotNull(stream2); + stream2.close(); + } + + @Test + public void testParseBrokenMessageOnFastPath() { + InputStream stream = createInputStream(dropLastOneByte(RESPONSE.toByteArray()), true); + ReadObjectResponseZeroCopyMessageMarshaller marshaller = createMarshaller(); + assertThrows( + StatusRuntimeException.class, + () -> { + marshaller.parse(stream); + }); + } + + @Test + public void testParseBrokenMessageOnSlowPath() { + InputStream stream = createInputStream(dropLastOneByte(RESPONSE.toByteArray()), false); + ReadObjectResponseZeroCopyMessageMarshaller marshaller = createMarshaller(); + assertThrows( + StatusRuntimeException.class, + () -> { + marshaller.parse(stream); + }); + } + + @Test + public void testResponseContentLifecycleHandle() throws IOException { + System.out.println(GrpcStorageOptions.ZeroCopyReadinessChecker.isReady()); + AtomicBoolean wasClosedCalled = new AtomicBoolean(false); + Closeable verifyClosed = () -> wasClosedCalled.set(true); + + ResponseContentLifecycleHandle handle = + new ResponseContentLifecycleHandle(RESPONSE, verifyClosed); + handle.close(); + + assertTrue(wasClosedCalled.get()); + + ResponseContentLifecycleHandle nullHandle = new ResponseContentLifecycleHandle(RESPONSE, null); + nullHandle.close(); + // No NullPointerException means test passes + } +} From 4bf0f4e4d0838b8405243d2ec97fde9a508e2a2e Mon Sep 17 00:00:00 2001 From: JesseLovelace <43148100+JesseLovelace@users.noreply.github.com> Date: Thu, 4 Apr 2024 14:01:35 -0700 Subject: [PATCH 02/11] Update google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcStorageOptions.java Co-authored-by: BenWhitehead --- .../main/java/com/google/cloud/storage/GrpcStorageOptions.java | 1 - 1 file changed, 1 deletion(-) diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcStorageOptions.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcStorageOptions.java index ed03e553aa..9b6a29756d 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcStorageOptions.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcStorageOptions.java @@ -1031,7 +1031,6 @@ static class ZeroCopyReadinessChecker { String unsafeByteOperationsClassName = messageLiteClassName.substring(0, messageLiteClassName.lastIndexOf('.') + 1) + "UnsafeByteOperations"; - System.out.println(unsafeByteOperationsClassName); Class unsafeByteOperationsClass = Class.forName(unsafeByteOperationsClassName); unsafeByteOperationsClassExists = (unsafeByteOperationsClass != null); } catch (ClassNotFoundException ex) { From 6f0d0c71cdc4f4b5ed2ba02e3361719ea23baa0f Mon Sep 17 00:00:00 2001 From: Jesse Lovelace Date: Fri, 5 Apr 2024 11:04:54 -0700 Subject: [PATCH 03/11] add copyright header --- .../cloud/storage/ZeroCopyMarshallerTest.java | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/google-cloud-storage/src/test/java/com/google/cloud/storage/ZeroCopyMarshallerTest.java b/google-cloud-storage/src/test/java/com/google/cloud/storage/ZeroCopyMarshallerTest.java index 88c5b674c4..c777aa5644 100644 --- a/google-cloud-storage/src/test/java/com/google/cloud/storage/ZeroCopyMarshallerTest.java +++ b/google-cloud-storage/src/test/java/com/google/cloud/storage/ZeroCopyMarshallerTest.java @@ -1,3 +1,18 @@ +/* + * Copyright 2024 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package com.google.cloud.storage; import static com.google.cloud.storage.TestUtils.getChecksummedData; From 7b41aa165d04c9fd33cc48d7235e734059f8fe6f Mon Sep 17 00:00:00 2001 From: JesseLovelace <43148100+JesseLovelace@users.noreply.github.com> Date: Wed, 10 Apr 2024 09:52:31 -0700 Subject: [PATCH 04/11] Apply suggestions from code review Co-authored-by: BenWhitehead --- .../java/com/google/cloud/storage/GrpcStorageOptions.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcStorageOptions.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcStorageOptions.java index 9b6a29756d..d5c34fcd33 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcStorageOptions.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcStorageOptions.java @@ -1003,7 +1003,7 @@ public void close() throws IOException { } } - static class ZeroCopyReadinessChecker { + static final class ZeroCopyReadinessChecker { private static final boolean isZeroCopyReady; static { @@ -1021,7 +1021,7 @@ static class ZeroCopyReadinessChecker { Class detachableClass = Class.forName(detachableClassName); detachableClassExists = (detachableClass != null); } catch (ClassNotFoundException ex) { - // Should cause false to be returned + // leaves detachableClassExists false } // Check whether com.google.protobuf.UnsafeByteOperations exists? boolean unsafeByteOperationsClassExists = false; @@ -1034,7 +1034,7 @@ static class ZeroCopyReadinessChecker { Class unsafeByteOperationsClass = Class.forName(unsafeByteOperationsClassName); unsafeByteOperationsClassExists = (unsafeByteOperationsClass != null); } catch (ClassNotFoundException ex) { - // Should cause false to be returned + // leaves unsafeByteOperationsClassExists false } isZeroCopyReady = detachableClassExists && unsafeByteOperationsClassExists; } From 4ad2a620db2c926f1823ed9d2d5d505aa0f0f090 Mon Sep 17 00:00:00 2001 From: Jesse Lovelace Date: Wed, 10 Apr 2024 13:50:04 -0700 Subject: [PATCH 05/11] Extract classes to own files, more StorageClient initialization --- .../GapicUnbufferedReadableByteChannel.java | 80 ------------------- .../cloud/storage/GrpcStorageOptions.java | 5 +- .../com/google/cloud/storage/ReadCursor.java | 37 +++++++++ .../ResponseContentLifecycleHandle.java | 44 ++++++++++ .../ResponseContentLifecycleManager.java | 16 ++++ 5 files changed, 99 insertions(+), 83 deletions(-) create mode 100644 google-cloud-storage/src/main/java/com/google/cloud/storage/ReadCursor.java create mode 100644 google-cloud-storage/src/main/java/com/google/cloud/storage/ResponseContentLifecycleHandle.java create mode 100644 google-cloud-storage/src/main/java/com/google/cloud/storage/ResponseContentLifecycleManager.java diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicUnbufferedReadableByteChannel.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicUnbufferedReadableByteChannel.java index c08e633d2e..c52fbacc68 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicUnbufferedReadableByteChannel.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicUnbufferedReadableByteChannel.java @@ -232,83 +232,3 @@ private Iterator ensureResponseIteratorOpen() { } } -/** - * Shrink wraps a beginning, offset and limit for tracking state of an individual invocation of - * {@link #read} - */ -final class ReadCursor { - private final long beginning; - private long offset; - private final long limit; - - ReadCursor(long beginning, long limit) { - this.limit = limit; - this.beginning = beginning; - this.offset = beginning; - } - - public boolean hasRemaining() { - return limit - offset > 0; - } - - public void advance(long incr) { - checkArgument(incr >= 0); - offset += incr; - } - - public long read() { - return offset - beginning; - } - - @Override - public String toString() { - return String.format("ReadCursor{begin=%d, offset=%d, limit=%d}", beginning, offset, limit); - } -} - -interface ResponseContentLifecycleManager { - ResponseContentLifecycleHandle get(ReadObjectResponse response); - - static ResponseContentLifecycleManager noop() { - return response -> - new ResponseContentLifecycleHandle( - response, - () -> { - // no-op - }); - } -} - -final class ResponseContentLifecycleHandle implements Closeable { - @Nullable private final Closeable dispose; - - private final List buffers; - - ResponseContentLifecycleHandle(ReadObjectResponse resp, @Nullable Closeable dispose) { - this.dispose = dispose; - - this.buffers = resp.getChecksummedData().getContent().asReadOnlyByteBufferList(); - } - - void copy(ReadCursor c, ByteBuffer[] dsts, int offset, int length) { - for (ByteBuffer b : buffers) { - long copiedBytes = Buffers.copy(b, dsts, offset, length); - c.advance(copiedBytes); - if (b.hasRemaining()) break; - } - } - - boolean hasRemaining() { - for (ByteBuffer b : buffers) { - if (b.hasRemaining()) return true; - } - return false; - } - - @Override - public void close() throws IOException { - if (dispose != null) { - dispose.close(); - } - } -} diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcStorageOptions.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcStorageOptions.java index d5c34fcd33..e858ad4cca 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcStorageOptions.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcStorageOptions.java @@ -688,7 +688,6 @@ public Storage create(StorageOptions options) { Tuple> t = grpcStorageOptions.resolveSettingsAndOpts(); StorageSettings storageSettings = t.x(); Opts defaultOpts = t.y(); - StorageClient client; if (ZeroCopyReadinessChecker.isReady()) { StorageStubSettings stubSettings = (StorageStubSettings) storageSettings.getStubSettings(); @@ -698,7 +697,7 @@ public Storage create(StorageOptions options) { InternalZeroCopyGrpcStorageStub stub = new InternalZeroCopyGrpcStorageStub( stubSettings, clientContext, grpcStorageCallableFactory); - client = new InternalStorageClient(stub); + StorageClient client = new InternalStorageClient(stub); return new GrpcStorageImpl( grpcStorageOptions, client, @@ -706,7 +705,7 @@ public Storage create(StorageOptions options) { grpcStorageOptions.blobWriteSessionConfig.createFactory(Clock.systemUTC()), defaultOpts); } else { - client = StorageClient.create(storageSettings); + StorageClient client = StorageClient.create(storageSettings); return new GrpcStorageImpl( grpcStorageOptions, client, diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/ReadCursor.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/ReadCursor.java new file mode 100644 index 0000000000..2d6e3bb052 --- /dev/null +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/ReadCursor.java @@ -0,0 +1,37 @@ +package com.google.cloud.storage; + +import static com.google.common.base.Preconditions.checkArgument; + +/** + * Shrink wraps a beginning, offset and limit for tracking state of an individual invocation of + * {@link #read} + */ +final class ReadCursor { + private final long beginning; + private long offset; + private final long limit; + + ReadCursor(long beginning, long limit) { + this.limit = limit; + this.beginning = beginning; + this.offset = beginning; + } + + public boolean hasRemaining() { + return limit - offset > 0; + } + + public void advance(long incr) { + checkArgument(incr >= 0); + offset += incr; + } + + public long read() { + return offset - beginning; + } + + @Override + public String toString() { + return String.format("ReadCursor{begin=%d, offset=%d, limit=%d}", beginning, offset, limit); + } +} diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/ResponseContentLifecycleHandle.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/ResponseContentLifecycleHandle.java new file mode 100644 index 0000000000..ac32a930c4 --- /dev/null +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/ResponseContentLifecycleHandle.java @@ -0,0 +1,44 @@ +package com.google.cloud.storage; + +import com.google.storage.v2.ReadObjectResponse; +import org.checkerframework.checker.nullness.qual.Nullable; + +import java.io.Closeable; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.List; + +final class ResponseContentLifecycleHandle implements Closeable { + @Nullable + private final Closeable dispose; + + private final List buffers; + + ResponseContentLifecycleHandle(ReadObjectResponse resp, @Nullable Closeable dispose) { + this.dispose = dispose; + + this.buffers = resp.getChecksummedData().getContent().asReadOnlyByteBufferList(); + } + + void copy(ReadCursor c, ByteBuffer[] dsts, int offset, int length) { + for (ByteBuffer b : buffers) { + long copiedBytes = Buffers.copy(b, dsts, offset, length); + c.advance(copiedBytes); + if (b.hasRemaining()) break; + } + } + + boolean hasRemaining() { + for (ByteBuffer b : buffers) { + if (b.hasRemaining()) return true; + } + return false; + } + + @Override + public void close() throws IOException { + if (dispose != null) { + dispose.close(); + } + } +} diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/ResponseContentLifecycleManager.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/ResponseContentLifecycleManager.java new file mode 100644 index 0000000000..f5519b2a97 --- /dev/null +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/ResponseContentLifecycleManager.java @@ -0,0 +1,16 @@ +package com.google.cloud.storage; + +import com.google.storage.v2.ReadObjectResponse; + +interface ResponseContentLifecycleManager { + ResponseContentLifecycleHandle get(ReadObjectResponse response); + + static ResponseContentLifecycleManager noop() { + return response -> + new ResponseContentLifecycleHandle( + response, + () -> { + // no-op + }); + } +} From 3eef08c01aa50e5a1b01b8c53d170b1d7bbd4aa4 Mon Sep 17 00:00:00 2001 From: Jesse Lovelace Date: Wed, 10 Apr 2024 15:49:32 -0700 Subject: [PATCH 06/11] copyright headers on new files --- .../java/com/google/cloud/storage/ReadCursor.java | 15 +++++++++++++++ .../storage/ResponseContentLifecycleHandle.java | 15 +++++++++++++++ .../storage/ResponseContentLifecycleManager.java | 15 +++++++++++++++ 3 files changed, 45 insertions(+) diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/ReadCursor.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/ReadCursor.java index 2d6e3bb052..697bfa3e96 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/ReadCursor.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/ReadCursor.java @@ -1,3 +1,18 @@ +/* + * Copyright 2024 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package com.google.cloud.storage; import static com.google.common.base.Preconditions.checkArgument; diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/ResponseContentLifecycleHandle.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/ResponseContentLifecycleHandle.java index ac32a930c4..ba23387532 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/ResponseContentLifecycleHandle.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/ResponseContentLifecycleHandle.java @@ -1,3 +1,18 @@ +/* + * Copyright 2024 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package com.google.cloud.storage; import com.google.storage.v2.ReadObjectResponse; diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/ResponseContentLifecycleManager.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/ResponseContentLifecycleManager.java index f5519b2a97..81daf20772 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/ResponseContentLifecycleManager.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/ResponseContentLifecycleManager.java @@ -1,3 +1,18 @@ +/* + * Copyright 2024 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package com.google.cloud.storage; import com.google.storage.v2.ReadObjectResponse; From 07efa6c5e3eb67ee9f9220fe9aa4a819c3f77703 Mon Sep 17 00:00:00 2001 From: Jesse Lovelace Date: Wed, 10 Apr 2024 15:56:56 -0700 Subject: [PATCH 07/11] formatter --- .../GapicUnbufferedReadableByteChannel.java | 4 -- .../com/google/cloud/storage/ReadCursor.java | 44 ++++++++--------- .../ResponseContentLifecycleHandle.java | 48 +++++++++---------- .../ResponseContentLifecycleManager.java | 18 +++---- 4 files changed, 54 insertions(+), 60 deletions(-) diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicUnbufferedReadableByteChannel.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicUnbufferedReadableByteChannel.java index c52fbacc68..f4cb74fdf2 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicUnbufferedReadableByteChannel.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicUnbufferedReadableByteChannel.java @@ -16,7 +16,6 @@ package com.google.cloud.storage; -import static com.google.common.base.Preconditions.checkArgument; import com.google.api.client.http.HttpStatusCodes; import com.google.api.core.ApiFuture; @@ -38,8 +37,6 @@ import java.nio.channels.ScatteringByteChannel; import java.util.Arrays; import java.util.Iterator; -import java.util.List; -import org.checkerframework.checker.nullness.qual.Nullable; final class GapicUnbufferedReadableByteChannel implements UnbufferedReadableByteChannel, ScatteringByteChannel { @@ -231,4 +228,3 @@ private Iterator ensureResponseIteratorOpen() { } } } - diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/ReadCursor.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/ReadCursor.java index 697bfa3e96..b65a6e257b 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/ReadCursor.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/ReadCursor.java @@ -22,31 +22,31 @@ * {@link #read} */ final class ReadCursor { - private final long beginning; - private long offset; - private final long limit; + private final long beginning; + private long offset; + private final long limit; - ReadCursor(long beginning, long limit) { - this.limit = limit; - this.beginning = beginning; - this.offset = beginning; - } + ReadCursor(long beginning, long limit) { + this.limit = limit; + this.beginning = beginning; + this.offset = beginning; + } - public boolean hasRemaining() { - return limit - offset > 0; - } + public boolean hasRemaining() { + return limit - offset > 0; + } - public void advance(long incr) { - checkArgument(incr >= 0); - offset += incr; - } + public void advance(long incr) { + checkArgument(incr >= 0); + offset += incr; + } - public long read() { - return offset - beginning; - } + public long read() { + return offset - beginning; + } - @Override - public String toString() { - return String.format("ReadCursor{begin=%d, offset=%d, limit=%d}", beginning, offset, limit); - } + @Override + public String toString() { + return String.format("ReadCursor{begin=%d, offset=%d, limit=%d}", beginning, offset, limit); + } } diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/ResponseContentLifecycleHandle.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/ResponseContentLifecycleHandle.java index ba23387532..20fc365832 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/ResponseContentLifecycleHandle.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/ResponseContentLifecycleHandle.java @@ -16,44 +16,42 @@ package com.google.cloud.storage; import com.google.storage.v2.ReadObjectResponse; -import org.checkerframework.checker.nullness.qual.Nullable; - import java.io.Closeable; import java.io.IOException; import java.nio.ByteBuffer; import java.util.List; +import org.checkerframework.checker.nullness.qual.Nullable; final class ResponseContentLifecycleHandle implements Closeable { - @Nullable - private final Closeable dispose; + @Nullable private final Closeable dispose; - private final List buffers; + private final List buffers; - ResponseContentLifecycleHandle(ReadObjectResponse resp, @Nullable Closeable dispose) { - this.dispose = dispose; + ResponseContentLifecycleHandle(ReadObjectResponse resp, @Nullable Closeable dispose) { + this.dispose = dispose; - this.buffers = resp.getChecksummedData().getContent().asReadOnlyByteBufferList(); - } + this.buffers = resp.getChecksummedData().getContent().asReadOnlyByteBufferList(); + } - void copy(ReadCursor c, ByteBuffer[] dsts, int offset, int length) { - for (ByteBuffer b : buffers) { - long copiedBytes = Buffers.copy(b, dsts, offset, length); - c.advance(copiedBytes); - if (b.hasRemaining()) break; - } + void copy(ReadCursor c, ByteBuffer[] dsts, int offset, int length) { + for (ByteBuffer b : buffers) { + long copiedBytes = Buffers.copy(b, dsts, offset, length); + c.advance(copiedBytes); + if (b.hasRemaining()) break; } + } - boolean hasRemaining() { - for (ByteBuffer b : buffers) { - if (b.hasRemaining()) return true; - } - return false; + boolean hasRemaining() { + for (ByteBuffer b : buffers) { + if (b.hasRemaining()) return true; } + return false; + } - @Override - public void close() throws IOException { - if (dispose != null) { - dispose.close(); - } + @Override + public void close() throws IOException { + if (dispose != null) { + dispose.close(); } + } } diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/ResponseContentLifecycleManager.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/ResponseContentLifecycleManager.java index 81daf20772..3236513398 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/ResponseContentLifecycleManager.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/ResponseContentLifecycleManager.java @@ -18,14 +18,14 @@ import com.google.storage.v2.ReadObjectResponse; interface ResponseContentLifecycleManager { - ResponseContentLifecycleHandle get(ReadObjectResponse response); + ResponseContentLifecycleHandle get(ReadObjectResponse response); - static ResponseContentLifecycleManager noop() { - return response -> - new ResponseContentLifecycleHandle( - response, - () -> { - // no-op - }); - } + static ResponseContentLifecycleManager noop() { + return response -> + new ResponseContentLifecycleHandle( + response, + () -> { + // no-op + }); + } } From fd01b5ca439f759a1f23fbe6335d662ec49b304e Mon Sep 17 00:00:00 2001 From: Jesse Lovelace Date: Wed, 10 Apr 2024 15:59:19 -0700 Subject: [PATCH 08/11] one more lint issue --- .../google/cloud/storage/GapicUnbufferedReadableByteChannel.java | 1 - 1 file changed, 1 deletion(-) diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicUnbufferedReadableByteChannel.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicUnbufferedReadableByteChannel.java index f4cb74fdf2..6cecc8dded 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicUnbufferedReadableByteChannel.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicUnbufferedReadableByteChannel.java @@ -16,7 +16,6 @@ package com.google.cloud.storage; - import com.google.api.client.http.HttpStatusCodes; import com.google.api.core.ApiFuture; import com.google.api.core.SettableApiFuture; From 172f6ff81787c8f89a9ced64bd9d1462fbd78254 Mon Sep 17 00:00:00 2001 From: BenWhitehead Date: Thu, 11 Apr 2024 15:04:25 -0400 Subject: [PATCH 09/11] fix: improve GrpcStorageOptions.ReadObjectResponseZeroCopyMessageMarshaller#close() handling of multiple IOExceptions --- .../cloud/storage/GrpcStorageOptions.java | 49 ++++++++--- .../cloud/storage/ZeroCopyMarshallerTest.java | 88 +++++++++++++++++++ 2 files changed, 126 insertions(+), 11 deletions(-) diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcStorageOptions.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcStorageOptions.java index e858ad4cca..438b5fd595 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcStorageOptions.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcStorageOptions.java @@ -84,18 +84,19 @@ import java.io.IOException; import java.io.InputStream; import java.io.Serializable; -import java.io.UncheckedIOException; import java.net.URI; import java.nio.ByteBuffer; import java.time.Clock; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.IdentityHashMap; import java.util.List; import java.util.Locale; import java.util.Map; import java.util.Map.Entry; +import java.util.Objects; import java.util.Set; import org.checkerframework.checker.nullness.qual.NonNull; import org.checkerframework.checker.nullness.qual.Nullable; @@ -989,16 +990,42 @@ public ResponseContentLifecycleHandle get(ReadObjectResponse response) { @Override public void close() throws IOException { - unclosedStreams - .values() - .forEach( - s -> { - try { - s.close(); - } catch (IOException e) { - throw new UncheckedIOException(e); - } - }); + closeAllStreams(unclosedStreams.values()); + } + + /** + * In the event closing the streams results in multiple streams throwing IOExceptions, collect + * them all as suppressed exceptions on the first occurrence. + */ + @VisibleForTesting + static void closeAllStreams(Collection inputStreams) throws IOException { + IOException ioException = + inputStreams.stream() + .map( + stream -> { + try { + stream.close(); + return null; + } catch (IOException e) { + return e; + } + }) + .filter(Objects::nonNull) + .reduce( + null, + (l, r) -> { + if (l != null) { + l.addSuppressed(r); + return l; + } else { + return r; + } + }, + (l, r) -> l); + + if (ioException != null) { + throw ioException; + } } } diff --git a/google-cloud-storage/src/test/java/com/google/cloud/storage/ZeroCopyMarshallerTest.java b/google-cloud-storage/src/test/java/com/google/cloud/storage/ZeroCopyMarshallerTest.java index c777aa5644..a754b18fad 100644 --- a/google-cloud-storage/src/test/java/com/google/cloud/storage/ZeroCopyMarshallerTest.java +++ b/google-cloud-storage/src/test/java/com/google/cloud/storage/ZeroCopyMarshallerTest.java @@ -16,12 +16,14 @@ package com.google.cloud.storage; import static com.google.cloud.storage.TestUtils.getChecksummedData; +import static com.google.common.truth.Truth.assertThat; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; import com.google.cloud.storage.GrpcStorageOptions.ReadObjectResponseZeroCopyMessageMarshaller; +import com.google.common.collect.ImmutableList; import com.google.common.hash.Hashing; import com.google.protobuf.ByteString; import com.google.storage.v2.ContentRange; @@ -32,11 +34,14 @@ import io.grpc.internal.ReadableBuffer; import io.grpc.internal.ReadableBuffers; import java.io.Closeable; +import java.io.FilterInputStream; import java.io.IOException; import java.io.InputStream; import java.nio.ByteBuffer; import java.util.Arrays; +import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Collectors; import org.junit.Test; public class ZeroCopyMarshallerTest { @@ -133,4 +138,87 @@ public void testResponseContentLifecycleHandle() throws IOException { nullHandle.close(); // No NullPointerException means test passes } + + @Test + public void testMarshallerClose_clean() throws IOException { + CloseAuditingInputStream stream1 = + CloseAuditingInputStream.of(createInputStream(RESPONSE.toByteArray(), true)); + CloseAuditingInputStream stream2 = + CloseAuditingInputStream.of(createInputStream(RESPONSE.toByteArray(), true)); + CloseAuditingInputStream stream3 = + CloseAuditingInputStream.of(createInputStream(RESPONSE.toByteArray(), true)); + + ReadObjectResponseZeroCopyMessageMarshaller.closeAllStreams( + ImmutableList.of(stream1, stream2, stream3)); + + assertThat(stream1.closed).isTrue(); + assertThat(stream2.closed).isTrue(); + assertThat(stream3.closed).isTrue(); + } + + @SuppressWarnings("resource") + @Test + public void testMarshallerClose_multipleIoExceptions() { + CloseAuditingInputStream stream1 = + new CloseAuditingInputStream(null) { + @Override + void onClose() throws IOException { + throw new IOException("Kaboom stream1"); + } + }; + CloseAuditingInputStream stream2 = + new CloseAuditingInputStream(null) { + @Override + void onClose() throws IOException { + throw new IOException("Kaboom stream2"); + } + }; + CloseAuditingInputStream stream3 = + new CloseAuditingInputStream(null) { + @Override + void onClose() throws IOException { + throw new IOException("Kaboom stream3"); + } + }; + + IOException ioException = + assertThrows( + IOException.class, + () -> + ReadObjectResponseZeroCopyMessageMarshaller.closeAllStreams( + ImmutableList.of(stream1, stream2, stream3))); + + assertThat(stream1.closed).isTrue(); + assertThat(stream2.closed).isTrue(); + assertThat(stream3.closed).isTrue(); + + assertThat(ioException).hasMessageThat().isEqualTo("Kaboom stream1"); + List messages = + Arrays.stream(ioException.getSuppressed()) + .map(Throwable::getMessage) + .collect(Collectors.toList()); + assertThat(messages).isEqualTo(ImmutableList.of("Kaboom stream2", "Kaboom stream3")); + } + + private static class CloseAuditingInputStream extends FilterInputStream { + + private boolean closed = false; + + private CloseAuditingInputStream(InputStream in) { + super(in); + } + + public static CloseAuditingInputStream of(InputStream in) { + return new CloseAuditingInputStream(in); + } + + @Override + public void close() throws IOException { + closed = true; + onClose(); + super.close(); + } + + void onClose() throws IOException {} + } } From 84eb48eed62838a9e609b191f1fcf3b6cbe84741 Mon Sep 17 00:00:00 2001 From: Jesse Lovelace Date: Thu, 11 Apr 2024 12:14:37 -0700 Subject: [PATCH 10/11] clean up ZeroCopyMarshallerTest --- .../cloud/storage/ZeroCopyMarshallerTest.java | 25 +++++++++---------- 1 file changed, 12 insertions(+), 13 deletions(-) diff --git a/google-cloud-storage/src/test/java/com/google/cloud/storage/ZeroCopyMarshallerTest.java b/google-cloud-storage/src/test/java/com/google/cloud/storage/ZeroCopyMarshallerTest.java index a754b18fad..57d96dfbca 100644 --- a/google-cloud-storage/src/test/java/com/google/cloud/storage/ZeroCopyMarshallerTest.java +++ b/google-cloud-storage/src/test/java/com/google/cloud/storage/ZeroCopyMarshallerTest.java @@ -47,7 +47,7 @@ public class ZeroCopyMarshallerTest { private final byte[] bytes = DataGenerator.base64Characters().genBytes(40); private final ByteString data = ByteString.copyFrom(bytes, 0, 10); - private final ReadObjectResponse RESPONSE = + private final ReadObjectResponse response = ReadObjectResponse.newBuilder() .setMetadata( Object.newBuilder() @@ -77,10 +77,10 @@ private InputStream createInputStream(byte[] bytes, boolean isZeroCopyable) { @Test public void testParseOnFastPath() throws IOException { - InputStream stream = createInputStream(RESPONSE.toByteArray(), true); + InputStream stream = createInputStream(response.toByteArray(), true); ReadObjectResponseZeroCopyMessageMarshaller marshaller = createMarshaller(); ReadObjectResponse response = marshaller.parse(stream); - assertEquals(response, RESPONSE); + assertEquals(response, this.response); ResponseContentLifecycleHandle stream2 = marshaller.get(response); assertNotNull(stream2); stream2.close(); @@ -91,10 +91,10 @@ public void testParseOnFastPath() throws IOException { @Test public void testParseOnSlowPath() throws IOException { - InputStream stream = createInputStream(RESPONSE.toByteArray(), false); + InputStream stream = createInputStream(response.toByteArray(), false); ReadObjectResponseZeroCopyMessageMarshaller marshaller = createMarshaller(); ReadObjectResponse response = marshaller.parse(stream); - assertEquals(response, RESPONSE); + assertEquals(response, this.response); ResponseContentLifecycleHandle stream2 = marshaller.get(response); assertNotNull(stream2); stream2.close(); @@ -102,7 +102,7 @@ public void testParseOnSlowPath() throws IOException { @Test public void testParseBrokenMessageOnFastPath() { - InputStream stream = createInputStream(dropLastOneByte(RESPONSE.toByteArray()), true); + InputStream stream = createInputStream(dropLastOneByte(response.toByteArray()), true); ReadObjectResponseZeroCopyMessageMarshaller marshaller = createMarshaller(); assertThrows( StatusRuntimeException.class, @@ -113,7 +113,7 @@ public void testParseBrokenMessageOnFastPath() { @Test public void testParseBrokenMessageOnSlowPath() { - InputStream stream = createInputStream(dropLastOneByte(RESPONSE.toByteArray()), false); + InputStream stream = createInputStream(dropLastOneByte(response.toByteArray()), false); ReadObjectResponseZeroCopyMessageMarshaller marshaller = createMarshaller(); assertThrows( StatusRuntimeException.class, @@ -124,17 +124,16 @@ public void testParseBrokenMessageOnSlowPath() { @Test public void testResponseContentLifecycleHandle() throws IOException { - System.out.println(GrpcStorageOptions.ZeroCopyReadinessChecker.isReady()); AtomicBoolean wasClosedCalled = new AtomicBoolean(false); Closeable verifyClosed = () -> wasClosedCalled.set(true); ResponseContentLifecycleHandle handle = - new ResponseContentLifecycleHandle(RESPONSE, verifyClosed); + new ResponseContentLifecycleHandle(response, verifyClosed); handle.close(); assertTrue(wasClosedCalled.get()); - ResponseContentLifecycleHandle nullHandle = new ResponseContentLifecycleHandle(RESPONSE, null); + ResponseContentLifecycleHandle nullHandle = new ResponseContentLifecycleHandle(response, null); nullHandle.close(); // No NullPointerException means test passes } @@ -142,11 +141,11 @@ public void testResponseContentLifecycleHandle() throws IOException { @Test public void testMarshallerClose_clean() throws IOException { CloseAuditingInputStream stream1 = - CloseAuditingInputStream.of(createInputStream(RESPONSE.toByteArray(), true)); + CloseAuditingInputStream.of(createInputStream(response.toByteArray(), true)); CloseAuditingInputStream stream2 = - CloseAuditingInputStream.of(createInputStream(RESPONSE.toByteArray(), true)); + CloseAuditingInputStream.of(createInputStream(response.toByteArray(), true)); CloseAuditingInputStream stream3 = - CloseAuditingInputStream.of(createInputStream(RESPONSE.toByteArray(), true)); + CloseAuditingInputStream.of(createInputStream(response.toByteArray(), true)); ReadObjectResponseZeroCopyMessageMarshaller.closeAllStreams( ImmutableList.of(stream1, stream2, stream3)); From 1d88419242e85864970051486e509eac09f5b398 Mon Sep 17 00:00:00 2001 From: Jesse Lovelace Date: Thu, 11 Apr 2024 14:11:45 -0700 Subject: [PATCH 11/11] add gprc core to pom --- google-cloud-storage/pom.xml | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/google-cloud-storage/pom.xml b/google-cloud-storage/pom.xml index 87deadffb5..c6219b776d 100644 --- a/google-cloud-storage/pom.xml +++ b/google-cloud-storage/pom.xml @@ -96,6 +96,10 @@ com.google.protobuf protobuf-java-util + + io.grpc + grpc-core + io.grpc grpc-protobuf