Skip to content

Commit

Permalink
Merge pull request #771 from paulcwarren/feature/byte-range-optimization
Browse files Browse the repository at this point in the history
Optimize byte range requests to S3 Storage
  • Loading branch information
paulcwarren authored Feb 11, 2022
2 parents d7d9124 + b2b7208 commit 1aecd18
Show file tree
Hide file tree
Showing 9 changed files with 192 additions and 13 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package org.springframework.content.commons.io;

public interface RangeableResource {

/**
* An optional aspect for Resources that wish to be given Range specifications so that they
* can prepare InputStream appropriately
*
* @param range the range
*/
void setRange(String range);
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.apache.commons.io.IOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.content.commons.io.RangeableResource;
import org.springframework.content.commons.mappingcontext.ContentProperty;
import org.springframework.content.commons.property.PropertyPath;
import org.springframework.content.commons.repository.AssociativeStore;
Expand Down Expand Up @@ -122,6 +123,10 @@ public void getContent(HttpServletRequest request, HttpServletResponse response,
}
}

if (resource instanceof RangeableResource) {
this.configureResourceForByteRangeRequest((RangeableResource)resource, headers);
}

request.setAttribute("SPRING_CONTENT_RESOURCE", resource);
request.setAttribute("SPRING_CONTENT_CONTENTTYPE", producedResourceType);
} catch (Exception e) {
Expand Down Expand Up @@ -344,4 +349,10 @@ public static boolean isClientAbortException(Exception e) {
}
return false;
}

private void configureResourceForByteRangeRequest(RangeableResource resource, HttpHeaders headers) {
if (headers.containsKey(HttpHeaders.RANGE)) {
resource.setRange(headers.getFirst(HttpHeaders.RANGE));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.apache.commons.io.FileUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.content.commons.io.RangeableResource;
import org.springframework.content.commons.mappingcontext.ContentProperty;
import org.springframework.content.commons.property.PropertyPath;
import org.springframework.content.commons.repository.ContentStore;
Expand Down Expand Up @@ -124,6 +125,10 @@ public void getContent(HttpServletRequest request, HttpServletResponse response,
}
}

if (resource instanceof RangeableResource) {
this.configureResourceForByteRangeRequest((RangeableResource)resource, headers);
}

request.setAttribute("SPRING_CONTENT_RESOURCE", resource);
request.setAttribute("SPRING_CONTENT_CONTENTTYPE", producedResourceType);
} catch (Exception e) {
Expand Down Expand Up @@ -195,16 +200,6 @@ public void setContent(HttpServletRequest request, HttpServletResponse response,
}
}

private int indexOfContentArg(Class<?>[] paramTypes) {
for (int i=0; i < paramTypes.length; i++) {
if (InputStream.class.equals(paramTypes[i]) || Resource.class.equals(paramTypes[i])) {
return i;
}
}

return 0;
}

@Override
public void unsetContent(Resource resource) throws MethodNotAllowedException {

Expand Down Expand Up @@ -292,6 +287,22 @@ private boolean matchParameters(MediaType acceptedMediaType, MediaType producabl
return true;
}

private void configureResourceForByteRangeRequest(RangeableResource resource, HttpHeaders headers) {
if (headers.containsKey(HttpHeaders.RANGE)) {
resource.setRange(headers.getFirst(HttpHeaders.RANGE));
}
}

private int indexOfContentArg(Class<?>[] paramTypes) {
for (int i=0; i < paramTypes.length; i++) {
if (InputStream.class.equals(paramTypes[i]) || Resource.class.equals(paramTypes[i])) {
return i;
}
}

return 0;
}

public static StoreExportedMethodsMap getExportedMethodsFor(Class<?> storeInterfaceClass) {

StoreExportedMethodsMap exportMap = storeExportedMethods.get(storeInterfaceClass);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.content.commons.io.DeletableResource;
import org.springframework.content.commons.io.RangeableResource;
import org.springframework.core.io.Resource;
import org.springframework.core.io.WritableResource;
import org.springframework.http.HttpHeaders;
Expand Down Expand Up @@ -66,6 +67,10 @@ else if (acceptedMimeType.includes(resourceType)) {
}
}

if (resource instanceof RangeableResource) {
this.configureResourceForByteRangeRequest((RangeableResource)resource, headers);
}

request.setAttribute("SPRING_CONTENT_RESOURCE", resource);
request.setAttribute("SPRING_CONTENT_CONTENTTYPE", producedResourceType);
} catch (Exception e) {
Expand Down Expand Up @@ -115,4 +120,10 @@ public void unsetContent(Resource resource) {
}
}
}

private void configureResourceForByteRangeRequest(RangeableResource resource, HttpHeaders headers) {
if (headers.containsKey(HttpHeaders.RANGE)) {
resource.setRange(headers.getFirst(HttpHeaders.RANGE));
}
}
}
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
package internal.org.springframework.content.rest.io;

import org.springframework.content.commons.io.RangeableResource;
import org.springframework.content.commons.mappingcontext.ContentProperty;
import org.springframework.core.io.WritableResource;

public interface AssociatedStoreResource<S> extends WritableResource, StoreResource {
public interface AssociatedStoreResource<S> extends WritableResource, StoreResource, RangeableResource {

S getAssociation();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import javax.persistence.Version;

import org.springframework.content.commons.io.DeletableResource;
import org.springframework.content.commons.io.RangeableResource;
import org.springframework.content.commons.mappingcontext.ContentProperty;
import org.springframework.content.commons.property.PropertyPath;
import org.springframework.content.commons.renditions.Renderable;
Expand Down Expand Up @@ -273,4 +274,11 @@ public void delete()
throws IOException {
((DeletableResource)original).delete();
}

@Override
public void setRange(String range) {
if (original instanceof RangeableResource) {
((RangeableResource)original).setRange(range);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,15 @@
import java.net.URL;

import org.springframework.content.commons.io.DeletableResource;
import org.springframework.content.commons.io.RangeableResource;
import org.springframework.core.io.Resource;
import org.springframework.core.io.WritableResource;
import org.springframework.util.Assert;

import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.DeleteObjectRequest;

public class S3StoreResource implements WritableResource, DeletableResource {
public class S3StoreResource implements WritableResource, DeletableResource, RangeableResource {

private S3Client client;
private Resource delegate;
Expand Down Expand Up @@ -115,4 +116,9 @@ public boolean isWritable() {
public OutputStream getOutputStream() throws IOException {
return ((WritableResource) delegate).getOutputStream();
}

@Override
public void setRange(String range) {
((RangeableResource)delegate).setRange(range);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.Future;

import org.springframework.content.commons.io.RangeableResource;
import org.springframework.core.io.AbstractResource;
import org.springframework.core.io.WritableResource;
import org.springframework.core.task.TaskExecutor;
Expand Down Expand Up @@ -50,7 +51,7 @@
* @author Alain Sahli
* @since 1.0
*/
public class SimpleStorageResource extends AbstractResource implements WritableResource {
public class SimpleStorageResource extends AbstractResource implements WritableResource, RangeableResource {

private final String bucketName;

Expand All @@ -64,6 +65,8 @@ public class SimpleStorageResource extends AbstractResource implements WritableR

private volatile HeadObjectResponse objectMetadata;

private String range;

public SimpleStorageResource(S3Client amazonS3, String bucketName, String objectName,
TaskExecutor taskExecutor) {
this(amazonS3, bucketName, objectName, taskExecutor, null);
Expand Down Expand Up @@ -93,13 +96,22 @@ public String getDescription() {
return builder.toString();
}

@Override
public void setRange(String range) {
this.range = range;
}

@Override
public InputStream getInputStream() throws IOException {
GetObjectRequest.Builder getObjectRequestBuilder = GetObjectRequest.builder()
.bucket(this.bucketName).key(this.objectName);
if (this.versionId != null) {
getObjectRequestBuilder.versionId(this.versionId);
}
if (this.range != null) {
getObjectRequestBuilder.range(range);
return new PartialContentInputStream(this.amazonS3.getObject(getObjectRequestBuilder.build()));
}
return this.amazonS3.getObject(getObjectRequestBuilder.build());
}

Expand Down Expand Up @@ -397,4 +409,90 @@ public CompletedPart call() throws Exception {

}

/**
* Create a new partial content input stream wrapper around the given delegate.
*
* The delegate is expected to be a "prepared" input stream onto a byte-range. Reading the
* input stream return the byte-range only.
*/
public static class PartialContentInputStream extends InputStream {

private InputStream delegate;

public PartialContentInputStream(InputStream delegate) {
this.delegate = delegate;
}

/**
* As this is a wrapper on a prepared input stream if the consumer asks us to skip
* just reply that we have.
*/
@Override
public long skip(long n)
throws IOException {
return n;
}

@Override
public int hashCode() {
return delegate.hashCode();
}

@Override
public int read(byte[] b)
throws IOException {
return delegate.read(b);
}

@Override
public boolean equals(Object obj) {
return delegate.equals(obj);
}

@Override
public int read()
throws IOException {

return delegate.read();
}

@Override
public int read(byte[] b, int off, int len)
throws IOException {
return delegate.read(b, off, len);
}

@Override
public String toString() {
return delegate.toString();
}

@Override
public int available()
throws IOException {
return delegate.available();
}

@Override
public void close()
throws IOException {
delegate.close();
}

@Override
public void mark(int readlimit) {
delegate.mark(readlimit);
}

@Override
public void reset()
throws IOException {
delegate.reset();
}

@Override
public boolean markSupported() {
return delegate.markSupported();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.springframework.content.commons.annotations.ContentId;
import org.springframework.content.commons.annotations.ContentLength;
import org.springframework.content.commons.io.DeletableResource;
import org.springframework.content.commons.io.RangeableResource;
import org.springframework.content.commons.property.PropertyPath;
import org.springframework.content.commons.repository.ContentStore;
import org.springframework.content.commons.repository.StoreAccessException;
Expand All @@ -60,6 +61,7 @@
import com.github.paulcwarren.ginkgo4j.Ginkgo4jConfiguration;
import com.github.paulcwarren.ginkgo4j.Ginkgo4jRunner;

import internal.org.springframework.content.s3.io.SimpleStorageResource;
import junit.framework.Assert;
import lombok.Getter;
import lombok.NoArgsConstructor;
Expand Down Expand Up @@ -157,6 +159,10 @@ public class S3StoreIT {
assertThat(genericResource.exists(), is(false));
});

It("should be a RangeableResource", () -> {
assertThat(genericResource, is(instanceOf(RangeableResource.class)));
});

Context("given content is added to that resource", () -> {

BeforeEach(() -> {
Expand Down Expand Up @@ -200,6 +206,21 @@ public class S3StoreIT {
});
});

Context("given a byte range is requsted", () -> {

It("should return a partial content input stream and the partial content", () -> {

((RangeableResource)genericResource).setRange("bytes=6-19");

try (InputStream expected = new ByteArrayInputStream("Spring Content".getBytes())) {
try (InputStream actual = genericResource.getInputStream()) {
assertThat(actual, is(instanceOf(SimpleStorageResource.PartialContentInputStream.class)));
assertThat(IOUtils.toString(expected, "UTF-8"), is(IOUtils.toString(actual, "UTF-8")));
}
}
});
});

Context("given that resource is then deleted", () -> {

BeforeEach(() -> {
Expand Down

0 comments on commit 1aecd18

Please sign in to comment.