Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(storage): fix tellg() values #12844

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions google/cloud/storage/client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,7 @@ ObjectReadStream Client::ReadObjectImpl(
}
auto stream =
ObjectReadStream(std::make_unique<internal::ObjectReadStreambuf>(
request, *std::move(source),
request.GetOption<ReadFromOffset>().value_or(0)));
request, *std::move(source)));
(void)stream.peek();
#if !GOOGLE_CLOUD_CPP_HAVE_EXCEPTIONS
// Without exceptions the streambuf cannot report errors, so we have to
Expand Down
23 changes: 20 additions & 3 deletions google/cloud/storage/internal/object_read_streambuf.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,22 @@ namespace cloud {
namespace storage {
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN
namespace internal {
namespace {

std::streamoff InitialOffset(ReadObjectRangeRequest const& request) {
if (request.HasOption<ReadLast>()) {
return -request.GetOption<ReadLast>().value();
}
return request.StartingByte();
}

} // namespace

ObjectReadStreambuf::ObjectReadStreambuf(
ReadObjectRangeRequest const& request,
std::unique_ptr<ObjectReadSource> source, std::streamoff pos_in_stream)
std::unique_ptr<ObjectReadSource> source)
: source_(std::move(source)),
source_pos_(pos_in_stream),
source_pos_(static_cast<std::streamoff>(InitialOffset(request))),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to static cast to std::streamoff when it looks like InitialOffset() already returns a std::streamoff?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

D'oh! That is what happens when you refactor to a function and forget to clean up the call site.

hash_function_(CreateHashFunction(request)),
hash_validator_(CreateHashValidator(request)) {}

Expand All @@ -51,6 +61,7 @@ ObjectReadStreambuf::pos_type ObjectReadStreambuf::seekoff(
// recreated in the general case, which doesn't fit the current code
// organization. We can, however, at least implement the bare minimum of this
// function allowing `tellg()` to work.
if (source_pos_ < 0) return -1;
if (which == std::ios_base::in && dir == std::ios_base::cur && off == 0) {
return source_pos_ - in_avail();
}
Expand Down Expand Up @@ -198,7 +209,6 @@ std::streamsize ObjectReadStreambuf::xsgetn(char* s, std::streamsize count) {
hash_function_->Update(absl::string_view{s + offset, read->bytes_received});
hash_validator_->ProcessHashValues(read->hashes);
offset += static_cast<std::streamsize>(read->bytes_received);
source_pos_ += static_cast<std::streamoff>(read->bytes_received);
for (auto const& kv : read->response.headers) {
headers_.emplace(kv.first, kv.second);
}
Expand All @@ -207,6 +217,13 @@ std::streamsize ObjectReadStreambuf::xsgetn(char* s, std::streamsize count) {
if (!storage_class_) storage_class_ = std::move(read->storage_class);
if (!size_) size_ = std::move(read->size);
if (!transformation_) transformation_ = std::move(read->transformation);

if (source_pos_ >= 0) {
source_pos_ += static_cast<std::streamoff>(read->bytes_received);
} else if (size_) {
source_pos_ = *size_ + source_pos_ +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use source_pos_ += ... too?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

static_cast<std::streamoff>(read->bytes_received);
}
return run_validator_if_closed(Status());
}

Expand Down
3 changes: 1 addition & 2 deletions google/cloud/storage/internal/object_read_streambuf.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,7 @@ namespace internal {
class ObjectReadStreambuf : public std::basic_streambuf<char> {
public:
ObjectReadStreambuf(ReadObjectRangeRequest const& request,
std::unique_ptr<ObjectReadSource> source,
std::streamoff pos_in_stream);
std::unique_ptr<ObjectReadSource> source);

/// Create a streambuf in a permanent error status.
ObjectReadStreambuf(ReadObjectRangeRequest const& request, Status status);
Expand Down
83 changes: 81 additions & 2 deletions google/cloud/storage/internal/object_read_streambuf_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,85 @@ TEST(ObjectReadStreambufTest, FailedTellg) {
EXPECT_EQ(-1, stream.tellg());
}

TEST(ObjectReadStreambufTest, TellgReadFromOffset) {
auto read_source = std::make_unique<testing::MockObjectReadSource>();
EXPECT_CALL(*read_source, IsOpen()).WillRepeatedly(Return(true));
EXPECT_CALL(*read_source, Read)
.WillOnce(Return(ReadSourceResult{15, {}}))
.WillOnce(Return(ReadSourceResult{10, {}}));
ObjectReadStreambuf buf(
ReadObjectRangeRequest{}.set_option(ReadFromOffset(1024)),
std::move(read_source));

std::istream stream(&buf);
EXPECT_EQ(stream.tellg(), 1024);
std::vector<char> v(1024);
stream.read(v.data(), 15);
EXPECT_EQ(stream.tellg(), 1024 + 15);
stream.read(v.data(), 10);
EXPECT_EQ(stream.tellg(), 1024 + 15 + 10);
}

TEST(ObjectReadStreambufTest, TellgReadRange) {
auto read_source = std::make_unique<testing::MockObjectReadSource>();
EXPECT_CALL(*read_source, IsOpen()).WillRepeatedly(Return(true));
EXPECT_CALL(*read_source, Read)
.WillOnce(Return(ReadSourceResult{15, {}}))
.WillOnce(Return(ReadSourceResult{10, {}}));
ObjectReadStreambuf buf(
ReadObjectRangeRequest{}.set_option(ReadRange(2048, 4096)),
std::move(read_source));

std::istream stream(&buf);
EXPECT_EQ(stream.tellg(), 2048);
std::vector<char> v(1024);
stream.read(v.data(), 15);
EXPECT_EQ(stream.tellg(), 2048 + 15);
stream.read(v.data(), 10);
EXPECT_EQ(stream.tellg(), 2048 + 15 + 10);
}

TEST(ObjectReadStreambufTest, TellgReadLastUnknownSize) {
auto read_source = std::make_unique<testing::MockObjectReadSource>();
EXPECT_CALL(*read_source, IsOpen()).WillRepeatedly(Return(true));
EXPECT_CALL(*read_source, Read)
.WillOnce(Return(ReadSourceResult{15, {}}))
.WillOnce(Return(ReadSourceResult{10, {}}));
ObjectReadStreambuf buf(ReadObjectRangeRequest{}.set_option(ReadLast(512)),
std::move(read_source));

std::istream stream(&buf);
EXPECT_EQ(stream.tellg(), -1);
std::vector<char> v(1024);
stream.read(v.data(), 15);
EXPECT_EQ(stream.tellg(), -1);
stream.read(v.data(), 10);
EXPECT_EQ(stream.tellg(), -1);
}

TEST(ObjectReadStreambufTest, TellgReadLastSize) {
auto make_response = [](std::size_t count) {
auto response = ReadSourceResult{count, {}};
response.size = 2048;
return response;
};
auto read_source = std::make_unique<testing::MockObjectReadSource>();
EXPECT_CALL(*read_source, IsOpen()).WillRepeatedly(Return(true));
EXPECT_CALL(*read_source, Read)
.WillOnce(Return(make_response(15)))
.WillOnce(Return(make_response(10)));
ObjectReadStreambuf buf(ReadObjectRangeRequest{}.set_option(ReadLast(512)),
std::move(read_source));

std::istream stream(&buf);
EXPECT_EQ(stream.tellg(), -1);
std::vector<char> v(1024);
stream.read(v.data(), 15);
EXPECT_EQ(stream.tellg(), 2048 - 512 + 15);
stream.read(v.data(), 10);
EXPECT_EQ(stream.tellg(), 2048 - 512 + 15 + 10);
}

TEST(ObjectReadStreambufTest, Success) {
auto read_source = std::make_unique<testing::MockObjectReadSource>();
EXPECT_CALL(*read_source, IsOpen()).WillRepeatedly(Return(true));
Expand All @@ -40,7 +119,7 @@ TEST(ObjectReadStreambufTest, Success) {
.WillOnce(Return(ReadSourceResult{15, {}}))
.WillOnce(Return(ReadSourceResult{15, {}}))
.WillOnce(Return(ReadSourceResult{128 * 1024, {}}));
ObjectReadStreambuf buf(ReadObjectRangeRequest{}, std::move(read_source), 0);
ObjectReadStreambuf buf(ReadObjectRangeRequest{}, std::move(read_source));

std::istream stream(&buf);
EXPECT_EQ(0, stream.tellg());
Expand Down Expand Up @@ -68,7 +147,7 @@ TEST(ObjectReadStreambufTest, WrongSeek) {
auto read_source = std::make_unique<testing::MockObjectReadSource>();
EXPECT_CALL(*read_source, IsOpen()).WillRepeatedly(Return(true));
EXPECT_CALL(*read_source, Read).WillOnce(Return(ReadSourceResult{10, {}}));
ObjectReadStreambuf buf(ReadObjectRangeRequest{}, std::move(read_source), 0);
ObjectReadStreambuf buf(ReadObjectRangeRequest{}, std::move(read_source));

std::istream stream(&buf);
EXPECT_EQ(0, stream.tellg());
Expand Down
29 changes: 29 additions & 0 deletions google/cloud/storage/tests/object_media_integration_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,35 @@ TEST_F(ObjectMediaIntegrationTest, ReadLastChunkReadLast) {
EXPECT_EQ(large_text.substr(kObjectSize - 129 * kKiB), actual);
}

/// @test Read the last chunk of an object by setting ReadLast option.
TEST_F(ObjectMediaIntegrationTest, ReadLastTellg) {
StatusOr<Client> client = MakeIntegrationTestClient();
ASSERT_STATUS_OK(client);

auto object_name = MakeRandomObjectName();

// This produces an object larger than 3MiB, but with a size that is not a
// multiple of 128KiB.
auto constexpr kKiB = 1024L;
auto constexpr kObjectSize = 256 * kKiB;
auto const expected = MakeRandomData(kObjectSize);
auto insert = client->InsertObject(bucket_name_, object_name, expected,
IfGenerationMatch(0));
ASSERT_STATUS_OK(insert);
ScheduleForDelete(*insert);

// Create an iostream to read the last 129KiB of the object, but simulate an
// application that does not know how large that last chunk is.
auto is = client->ReadObject(bucket_name_, object_name, ReadLast(129 * kKiB));
EXPECT_EQ(is.tellg(), kObjectSize - 129 * kKiB);
std::vector<char> buffer(1024 * kKiB);
is.read(buffer.data(), 1000);
EXPECT_EQ(is.gcount(), 1000);
EXPECT_EQ(is.tellg(), kObjectSize - 129 * kKiB + 1000);
auto const actual = std::string(buffer.data(), 1000);
EXPECT_EQ(actual, expected.substr(kObjectSize - 129 * kKiB, 1000));
}

/// @test Read an object by chunks of equal size.
TEST_F(ObjectMediaIntegrationTest, ReadByChunk) {
StatusOr<Client> client = MakeIntegrationTestClient();
Expand Down