Skip to content

Commit

Permalink
Merge pull request #20965 from nextcloud/backport/20033/stable18
Browse files Browse the repository at this point in the history
[stable18] Enable fseek for files in S3 storage
  • Loading branch information
MorrisJobke authored May 25, 2020
2 parents 4e8f727 + 3d3ee1b commit 1d816ad
Show file tree
Hide file tree
Showing 6 changed files with 236 additions and 23 deletions.
1 change: 1 addition & 0 deletions lib/composer/composer/autoload_classmap.php
Original file line number Diff line number Diff line change
Expand Up @@ -1007,6 +1007,7 @@
'OC\\Files\\Storage\\Wrapper\\Wrapper' => $baseDir . '/lib/private/Files/Storage/Wrapper/Wrapper.php',
'OC\\Files\\Stream\\Encryption' => $baseDir . '/lib/private/Files/Stream/Encryption.php',
'OC\\Files\\Stream\\Quota' => $baseDir . '/lib/private/Files/Stream/Quota.php',
'OC\\Files\\Stream\\SeekableHttpStream' => $baseDir . '/lib/private/Files/Stream/SeekableHttpStream.php',
'OC\\Files\\Type\\Detection' => $baseDir . '/lib/private/Files/Type/Detection.php',
'OC\\Files\\Type\\Loader' => $baseDir . '/lib/private/Files/Type/Loader.php',
'OC\\Files\\Type\\TemplateManager' => $baseDir . '/lib/private/Files/Type/TemplateManager.php',
Expand Down
1 change: 1 addition & 0 deletions lib/composer/composer/autoload_static.php
Original file line number Diff line number Diff line change
Expand Up @@ -1036,6 +1036,7 @@ class ComposerStaticInit53792487c5a8370acc0b06b1a864ff4c
'OC\\Files\\Storage\\Wrapper\\Wrapper' => __DIR__ . '/../../..' . '/lib/private/Files/Storage/Wrapper/Wrapper.php',
'OC\\Files\\Stream\\Encryption' => __DIR__ . '/../../..' . '/lib/private/Files/Stream/Encryption.php',
'OC\\Files\\Stream\\Quota' => __DIR__ . '/../../..' . '/lib/private/Files/Stream/Quota.php',
'OC\\Files\\Stream\\SeekableHttpStream' => __DIR__ . '/../../..' . '/lib/private/Files/Stream/SeekableHttpStream.php',
'OC\\Files\\Type\\Detection' => __DIR__ . '/../../..' . '/lib/private/Files/Type/Detection.php',
'OC\\Files\\Type\\Loader' => __DIR__ . '/../../..' . '/lib/private/Files/Type/Loader.php',
'OC\\Files\\Type\\TemplateManager' => __DIR__ . '/../../..' . '/lib/private/Files/Type/TemplateManager.php',
Expand Down
45 changes: 24 additions & 21 deletions lib/private/Files/ObjectStore/S3ObjectTrait.php
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
use Aws\S3\ObjectUploader;
use Aws\S3\S3Client;
use Icewind\Streams\CallbackWrapper;
use OC\Files\Stream\SeekableHttpStream;

const S3_UPLOAD_PART_SIZE = 524288000; // 500MB

Expand All @@ -49,27 +50,29 @@ abstract protected function getConnection();
* @since 7.0.0
*/
function readObject($urn) {
$client = $this->getConnection();
$command = $client->getCommand('GetObject', [
'Bucket' => $this->bucket,
'Key' => $urn
]);
$request = \Aws\serialize($command);
$headers = [];
foreach ($request->getHeaders() as $key => $values) {
foreach ($values as $value) {
$headers[] = "$key: $value";
return SeekableHttpStream::open(function ($range) use ($urn) {
$command = $this->getConnection()->getCommand('GetObject', [
'Bucket' => $this->bucket,
'Key' => $urn,
'Range' => 'bytes=' . $range,
]);
$request = \Aws\serialize($command);
$headers = [];
foreach ($request->getHeaders() as $key => $values) {
foreach ($values as $value) {
$headers[] = "$key: $value";
}
}
}
$opts = [
'http' => [
'protocol_version' => 1.1,
'header' => $headers
]
];
$opts = [
'http' => [
'protocol_version' => 1.1,
'header' => $headers,
],
];

$context = stream_context_create($opts);
return fopen($request->getUri(), 'r', false, $context);
$context = stream_context_create($opts);
return fopen($request->getUri(), 'r', false, $context);
});
}

/**
Expand All @@ -87,7 +90,7 @@ function writeObject($urn, $stream) {
$uploader = new MultipartUploader($this->getConnection(), $countStream, [
'bucket' => $this->bucket,
'key' => $urn,
'part_size' => S3_UPLOAD_PART_SIZE
'part_size' => S3_UPLOAD_PART_SIZE,
]);

try {
Expand All @@ -114,7 +117,7 @@ function writeObject($urn, $stream) {
function deleteObject($urn) {
$this->getConnection()->deleteObject([
'Bucket' => $this->bucket,
'Key' => $urn
'Key' => $urn,
]);
}

Expand Down
192 changes: 192 additions & 0 deletions lib/private/Files/Stream/SeekableHttpStream.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
<?php
/**
*
* @copyright Copyright (c) 2020, Lukas Stabe (lukas@stabe.de)
*
* @license GNU AGPL version 3 or any later version
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
*/

namespace OC\Files\Stream;

use Icewind\Streams\File;

/**
* A stream wrapper that uses http range requests to provide a seekable stream for http reading
*/
class SeekableHttpStream implements File {
private const PROTOCOL = 'httpseek';

private static $registered = false;

/**
* Registers the stream wrapper using the `httpseek://` url scheme
* $return void
*/
private static function registerIfNeeded() {
if (!self::$registered) {
stream_wrapper_register(
self::PROTOCOL,
self::class
);
self::$registered = true;
}
}

/**
* Open a readonly-seekable http stream
*
* The provided callback will be called with byte range and should return an http stream for the requested range
*
* @param callable $callback
* @return false|resource
*/
public static function open(callable $callback) {
$context = stream_context_create([
SeekableHttpStream::PROTOCOL => [
'callback' => $callback
],
]);

SeekableHttpStream::registerIfNeeded();
return fopen(SeekableHttpStream::PROTOCOL . '://', 'r', false, $context);
}

/** @var resource */
public $context;

/** @var callable */
private $openCallback;

/** @var resource */
private $current;
/** @var int */
private $offset = 0;

private function reconnect(int $start) {
$range = $start . '-';
if ($this->current != null) {
fclose($this->current);
}

$this->current = ($this->openCallback)($range);

if ($this->current === false) {
return false;
}

$responseHead = stream_get_meta_data($this->current)['wrapper_data'];
$rangeHeaders = array_values(array_filter($responseHead, function ($v) {
return preg_match('#^content-range:#i', $v) === 1;
}));
if (!$rangeHeaders) {
return false;
}
$contentRange = $rangeHeaders[0];

$content = trim(explode(':', $contentRange)[1]);
$range = trim(explode(' ', $content)[1]);
$begin = intval(explode('-', $range)[0]);

if ($begin !== $start) {
return false;
}

$this->offset = $begin;

return true;
}

public function stream_open($path, $mode, $options, &$opened_path) {
$options = stream_context_get_options($this->context)[self::PROTOCOL];
$this->openCallback = $options['callback'];

return $this->reconnect(0);
}

public function stream_read($count) {
if (!$this->current) {
return false;
}
$ret = fread($this->current, $count);
$this->offset += strlen($ret);
return $ret;
}

public function stream_seek($offset, $whence = SEEK_SET) {
switch ($whence) {
case SEEK_SET:
if ($offset === $this->offset) {
return true;
}
return $this->reconnect($offset);
case SEEK_CUR:
if ($offset === 0) {
return true;
}
return $this->reconnect($this->offset + $offset);
case SEEK_END:
return false;
}
return false;
}

public function stream_tell() {
return $this->offset;
}

public function stream_stat() {
if (is_resource($this->current)) {
return fstat($this->current);
} else {
return false;
}
}

public function stream_eof() {
if (is_resource($this->current)) {
return feof($this->current);
} else {
return true;
}
}

public function stream_close() {
if (is_resource($this->current)) {
fclose($this->current);
}
}

public function stream_write($data) {
return false;
}

public function stream_set_option($option, $arg1, $arg2) {
return false;
}

public function stream_truncate($size) {
return false;
}

public function stream_lock($operation) {
return false;
}

public function stream_flush() {
return; //noop because readonly stream
}
}
2 changes: 1 addition & 1 deletion tests/lib/Files/ObjectStore/ObjectStoreTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ abstract class ObjectStoreTest extends TestCase {
*/
abstract protected function getInstance();

private function stringToStream($data) {
protected function stringToStream($data) {
$stream = fopen('php://temp', 'w+');
fwrite($stream, $data);
rewind($stream);
Expand Down
18 changes: 17 additions & 1 deletion tests/lib/Files/ObjectStore/S3Test.php
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
class MultiPartUploadS3 extends S3 {
function writeObject($urn, $stream) {
$this->getConnection()->upload($this->bucket, $urn, $stream, 'private', [
'mup_threshold' => 1
'mup_threshold' => 1,
]);
}
}
Expand Down Expand Up @@ -83,4 +83,20 @@ public function testUploadNonSeekable() {

$this->assertEquals(file_get_contents(__FILE__), stream_get_contents($result));
}

public function testSeek() {
$data = file_get_contents(__FILE__);

$instance = $this->getInstance();
$instance->writeObject('seek', $this->stringToStream($data));

$read = $instance->readObject('seek');
$this->assertEquals(substr($data, 0, 100), fread($read, 100));

fseek($read, 10);
$this->assertEquals(substr($data, 10, 100), fread($read, 100));

fseek($read, 100, SEEK_CUR);
$this->assertEquals(substr($data, 210, 100), fread($read, 100));
}
}

0 comments on commit 1d816ad

Please sign in to comment.