Skip to content

Commit

Permalink
Merge branch 'apache:master' into ARROW-14229
Browse files Browse the repository at this point in the history
  • Loading branch information
bkmgit authored Oct 21, 2021
2 parents b4807e3 + 6f478d0 commit facb2ed
Show file tree
Hide file tree
Showing 13 changed files with 419 additions and 56 deletions.
2 changes: 1 addition & 1 deletion cpp/src/arrow/compute/kernels/vector_sort_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1690,7 +1690,7 @@ class TestTableSortIndicesRandom : public testing::TestWithParam<RandomParam> {
for (const auto& pair : sort_columns_) {
ColumnComparator comparator(pair.second, options_.null_placement);
const auto& chunked_array = *pair.first;
int64_t lhs_index, rhs_index;
int64_t lhs_index = 0, rhs_index = 0;
const Array* lhs_array = FindTargetArray(chunked_array, lhs, &lhs_index);
const Array* rhs_array = FindTargetArray(chunked_array, rhs, &rhs_index);
int compared = comparator(*lhs_array, *rhs_array, lhs_index, rhs_index);
Expand Down
63 changes: 59 additions & 4 deletions cpp/src/arrow/filesystem/gcsfs.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

#include <google/cloud/storage/client.h>

#include "arrow/buffer.h"
#include "arrow/filesystem/gcsfs_internal.h"
#include "arrow/filesystem/path_util.h"
#include "arrow/result.h"
Expand All @@ -28,6 +29,8 @@ namespace arrow {
namespace fs {
namespace {

namespace gcs = google::cloud::storage;

auto constexpr kSep = '/';

struct GcsPath {
Expand Down Expand Up @@ -58,9 +61,48 @@ struct GcsPath {
}
};

} // namespace
class GcsInputStream : public arrow::io::InputStream {
public:
explicit GcsInputStream(gcs::ObjectReadStream stream) : stream_(std::move(stream)) {}

namespace gcs = google::cloud::storage;
~GcsInputStream() override = default;

Status Close() override {
stream_.Close();
return Status::OK();
}

Result<int64_t> Tell() const override {
if (!stream_) {
return Status::IOError("invalid stream");
}
return stream_.tellg();
}

bool closed() const override { return !stream_.IsOpen(); }

Result<int64_t> Read(int64_t nbytes, void* out) override {
stream_.read(static_cast<char*>(out), nbytes);
if (!stream_.status().ok()) {
return internal::ToArrowStatus(stream_.status());
}
return stream_.gcount();
}

Result<std::shared_ptr<Buffer>> Read(int64_t nbytes) override {
ARROW_ASSIGN_OR_RAISE(auto buffer, arrow::AllocateResizableBuffer(nbytes));
stream_.read(reinterpret_cast<char*>(buffer->mutable_data()), nbytes);
if (!stream_.status().ok()) {
return internal::ToArrowStatus(stream_.status());
}
return arrow::SliceMutableBufferSafe(std::move(buffer), 0, stream_.gcount());
}

private:
mutable gcs::ObjectReadStream stream_;
};

} // namespace

google::cloud::Options AsGoogleCloudOptions(const GcsOptions& o) {
auto options = google::cloud::Options{};
Expand Down Expand Up @@ -95,6 +137,14 @@ class GcsFileSystem::Impl {
return GetFileInfoImpl(path, std::move(meta).status(), FileType::Directory);
}

Result<std::shared_ptr<io::InputStream>> OpenInputStream(const GcsPath& path) {
auto stream = client_.ReadObject(path.bucket, path.object);
if (!stream.status().ok()) {
return internal::ToArrowStatus(stream.status());
}
return std::make_shared<GcsInputStream>(std::move(stream));
}

private:
static Result<FileInfo> GetFileInfoImpl(const GcsPath& path,
const google::cloud::Status& status,
Expand Down Expand Up @@ -169,12 +219,17 @@ Status GcsFileSystem::CopyFile(const std::string& src, const std::string& dest)

Result<std::shared_ptr<io::InputStream>> GcsFileSystem::OpenInputStream(
const std::string& path) {
return Status::NotImplemented("The GCS FileSystem is not fully implemented");
ARROW_ASSIGN_OR_RAISE(auto p, GcsPath::FromString(path));
return impl_->OpenInputStream(p);
}

Result<std::shared_ptr<io::InputStream>> GcsFileSystem::OpenInputStream(
const FileInfo& info) {
return Status::NotImplemented("The GCS FileSystem is not fully implemented");
if (!info.IsFile()) {
return Status::IOError("Only files can be opened as input streams");
}
ARROW_ASSIGN_OR_RAISE(auto p, GcsPath::FromString(info.path()));
return impl_->OpenInputStream(p);
}

Result<std::shared_ptr<io::RandomAccessFile>> GcsFileSystem::OpenInputFile(
Expand Down
4 changes: 1 addition & 3 deletions cpp/src/arrow/filesystem/gcsfs_internal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,8 @@ Status ToArrowStatus(const google::cloud::Status& s) {
case google::cloud::StatusCode::kInvalidArgument:
return Status::Invalid(os.str());
case google::cloud::StatusCode::kDeadlineExceeded:
return Status::IOError(os.str());
case google::cloud::StatusCode::kNotFound:
// TODO: it is unclear if a better mapping would be possible.
return Status::UnknownError(os.str());
return Status::IOError(os.str());
case google::cloud::StatusCode::kAlreadyExists:
return Status::AlreadyExists(os.str());
case google::cloud::StatusCode::kPermissionDenied:
Expand Down
109 changes: 102 additions & 7 deletions cpp/src/arrow/filesystem/gcsfs_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,13 @@
#include <google/cloud/storage/options.h>
#include <gtest/gtest.h>

#include <array>
#include <boost/process.hpp>
#include <string>

#include "arrow/filesystem/gcsfs_internal.h"
#include "arrow/filesystem/test_util.h"
#include "arrow/testing/gtest_util.h"
#include "arrow/testing/util.h"

namespace arrow {
Expand All @@ -45,6 +47,15 @@ using ::testing::Not;
using ::testing::NotNull;

auto const* kPreexistingBucket = "test-bucket-name";
auto const* kPreexistingObject = "test-object-name";
auto const* kLoremIpsum = R"""(
Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor
incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis
nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat.
Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu
fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proident, sunt in
culpa qui officia deserunt mollit anim id est laborum.
)""";

class GcsIntegrationTest : public ::testing::Test {
public:
Expand All @@ -65,16 +76,29 @@ class GcsIntegrationTest : public ::testing::Test {
server_process_ = bp::child(boost::this_process::environment(), exe_path, "-m",
"testbench", "--port", port_);

// Create a bucket in the testbench. This makes it easier to bootstrap GcsFileSystem
// and its tests.
// Create a bucket and a small file in the testbench. This makes it easier to
// bootstrap GcsFileSystem and its tests.
auto client = gcs::Client(
google::cloud::Options{}
.set<gcs::RestEndpointOption>("http://127.0.0.1:" + port_)
.set<gc::UnifiedCredentialsOption>(gc::MakeInsecureCredentials()));
google::cloud::StatusOr<gcs::BucketMetadata> metadata = client.CreateBucketForProject(
google::cloud::StatusOr<gcs::BucketMetadata> bucket = client.CreateBucketForProject(
kPreexistingBucket, "ignored-by-testbench", gcs::BucketMetadata{});
ASSERT_TRUE(metadata.ok()) << "Failed to create bucket <" << kPreexistingBucket
<< ">, status=" << metadata.status();
ASSERT_TRUE(bucket.ok()) << "Failed to create bucket <" << kPreexistingBucket
<< ">, status=" << bucket.status();

google::cloud::StatusOr<gcs::ObjectMetadata> object =
client.InsertObject(kPreexistingBucket, kPreexistingObject, kLoremIpsum);
ASSERT_TRUE(object.ok()) << "Failed to create object <" << kPreexistingObject
<< ">, status=" << object.status();
}

static std::string PreexistingObjectPath() {
return std::string(kPreexistingBucket) + "/" + kPreexistingObject;
}

static std::string NotFoundObjectPath() {
return std::string(kPreexistingBucket) + "/not-found";
}

GcsOptions TestGcsOptions() {
Expand Down Expand Up @@ -114,7 +138,7 @@ TEST(GcsFileSystem, ToArrowStatus) {
{google::cloud::StatusCode::kUnknown, StatusCode::UnknownError},
{google::cloud::StatusCode::kInvalidArgument, StatusCode::Invalid},
{google::cloud::StatusCode::kDeadlineExceeded, StatusCode::IOError},
{google::cloud::StatusCode::kNotFound, StatusCode::UnknownError},
{google::cloud::StatusCode::kNotFound, StatusCode::IOError},
{google::cloud::StatusCode::kAlreadyExists, StatusCode::AlreadyExists},
{google::cloud::StatusCode::kPermissionDenied, StatusCode::IOError},
{google::cloud::StatusCode::kUnauthenticated, StatusCode::IOError},
Expand Down Expand Up @@ -159,11 +183,82 @@ TEST(GcsFileSystem, FileSystemCompare) {
EXPECT_FALSE(a->Equals(*b));
}

TEST_F(GcsIntegrationTest, MakeBucket) {
TEST_F(GcsIntegrationTest, GetFileInfoBucket) {
auto fs = internal::MakeGcsFileSystemForTest(TestGcsOptions());
arrow::fs::AssertFileInfo(fs.get(), kPreexistingBucket, FileType::Directory);
}

TEST_F(GcsIntegrationTest, GetFileInfoObject) {
auto fs = internal::MakeGcsFileSystemForTest(TestGcsOptions());
arrow::fs::AssertFileInfo(fs.get(), PreexistingObjectPath(), FileType::File);
}

TEST_F(GcsIntegrationTest, ReadObjectString) {
auto fs = internal::MakeGcsFileSystemForTest(TestGcsOptions());

std::shared_ptr<io::InputStream> stream;
ASSERT_OK_AND_ASSIGN(stream, fs->OpenInputStream(PreexistingObjectPath()));

std::array<char, 1024> buffer{};
std::int64_t size;
ASSERT_OK_AND_ASSIGN(size, stream->Read(buffer.size(), buffer.data()));

EXPECT_EQ(std::string(buffer.data(), size), kLoremIpsum);
}

TEST_F(GcsIntegrationTest, ReadObjectStringBuffers) {
auto fs = internal::MakeGcsFileSystemForTest(TestGcsOptions());

std::shared_ptr<io::InputStream> stream;
ASSERT_OK_AND_ASSIGN(stream, fs->OpenInputStream(PreexistingObjectPath()));

std::string contents;
std::shared_ptr<Buffer> buffer;
do {
ASSERT_OK_AND_ASSIGN(buffer, stream->Read(16));
contents.append(buffer->ToString());
} while (buffer && buffer->size() != 0);

EXPECT_EQ(contents, kLoremIpsum);
}

TEST_F(GcsIntegrationTest, ReadObjectInfo) {
auto fs = internal::MakeGcsFileSystemForTest(TestGcsOptions());

arrow::fs::FileInfo info;
ASSERT_OK_AND_ASSIGN(info, fs->GetFileInfo(PreexistingObjectPath()));

std::shared_ptr<io::InputStream> stream;
ASSERT_OK_AND_ASSIGN(stream, fs->OpenInputStream(info));

std::array<char, 1024> buffer{};
std::int64_t size;
ASSERT_OK_AND_ASSIGN(size, stream->Read(buffer.size(), buffer.data()));

EXPECT_EQ(std::string(buffer.data(), size), kLoremIpsum);
}

TEST_F(GcsIntegrationTest, ReadObjectNotFound) {
auto fs = internal::MakeGcsFileSystemForTest(TestGcsOptions());

auto result = fs->OpenInputStream(NotFoundObjectPath());
EXPECT_EQ(result.status().code(), StatusCode::IOError);
}

TEST_F(GcsIntegrationTest, ReadObjectInfoInvalid) {
auto fs = internal::MakeGcsFileSystemForTest(TestGcsOptions());

arrow::fs::FileInfo info;
ASSERT_OK_AND_ASSIGN(info, fs->GetFileInfo(kPreexistingBucket));

auto result = fs->OpenInputStream(NotFoundObjectPath());
EXPECT_EQ(result.status().code(), StatusCode::IOError);

ASSERT_OK_AND_ASSIGN(info, fs->GetFileInfo(NotFoundObjectPath()));
result = fs->OpenInputStream(NotFoundObjectPath());
EXPECT_EQ(result.status().code(), StatusCode::IOError);
}

} // namespace
} // namespace fs
} // namespace arrow
4 changes: 2 additions & 2 deletions cpp/thirdparty/versions.txt
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,8 @@ ARROW_GOOGLE_CLOUD_CPP_BUILD_VERSION=v1.32.0
ARROW_GOOGLE_CLOUD_CPP_BUILD_SHA256_CHECKSUM=c62338389f76915179fe61d8c0f5fefa06131b4e0d7312707af5309b1394e099
ARROW_GRPC_BUILD_VERSION=v1.35.0
ARROW_GRPC_BUILD_SHA256_CHECKSUM=27dd2fc5c9809ddcde8eb6fa1fa278a3486566dfc28335fca13eb8df8bd3b958
ARROW_GTEST_BUILD_VERSION=1.11.0
ARROW_GTEST_BUILD_SHA256_CHECKSUM=b4870bf121ff7795ba20d20bcdd8627b8e088f2d1dab299a031c1034eddc93d5
ARROW_GTEST_BUILD_VERSION=1.10.0
ARROW_GTEST_BUILD_SHA256_CHECKSUM=9dc9157a9a1551ec7a7e43daea9a694a0bb5fb8bec81235d8a1e6ef64c716dcb
ARROW_JEMALLOC_BUILD_VERSION=5.2.1
ARROW_JEMALLOC_BUILD_SHA256_CHECKSUM=34330e5ce276099e2e8950d9335db5a875689a4c6a56751ef3b1d8c537f887f6
ARROW_LZ4_BUILD_VERSION=v1.9.3
Expand Down
44 changes: 22 additions & 22 deletions docs/source/python/dataset.rst
Original file line number Diff line number Diff line change
Expand Up @@ -583,28 +583,28 @@ which columns are used to partition the dataset. This is useful when you expect
query your data in specific ways and you can utilize partitioning to reduce the
amount of data you need to read.

.. To add when ARROW-12364 is merged
Customizing & inspecting written files
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
By default the dataset API will create files named "part-i.format" where "i" is a integer
generated during the write and "format" is the file format specified in the write_dataset
call. For simple datasets it may be possible to know which files will be created but for
larger or partitioned datasets it is not so easy. The ``file_visitor`` keyword can be used
to supply a visitor that will be called as each file is created:
.. ipython:: python
def file_visitor(written_file):
print(f"path={written_file.path}")
print(f"metadata={written_file.metadata}")
ds.write_dataset(table, dataset_root, format="parquet", partitioning=part,
file_visitor=file_visitor)
This will allow you to collect the filenames that belong to the dataset and store them elsewhere
which can be useful when you want to avoid scanning directories the next time you need to read
the data. It can also be used to generate the _metadata index file used by other tools such as
dask or spark to create an index of the dataset.
Customizing & inspecting written files
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

By default the dataset API will create files named "part-i.format" where "i" is a integer
generated during the write and "format" is the file format specified in the write_dataset
call. For simple datasets it may be possible to know which files will be created but for
larger or partitioned datasets it is not so easy. The ``file_visitor`` keyword can be used
to supply a visitor that will be called as each file is created:

.. ipython:: python
def file_visitor(written_file):
print(f"path={written_file.path}")
print(f"metadata={written_file.metadata}")
ds.write_dataset(table, base / "dataset_visited", format="parquet", partitioning=part,
file_visitor=file_visitor)
This will allow you to collect the filenames that belong to the dataset and store them elsewhere
which can be useful when you want to avoid scanning directories the next time you need to read
the data. It can also be used to generate the _metadata index file used by other tools such as
dask or spark to create an index of the dataset.

Configuring format-specific parameters during a write
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Expand Down
Loading

0 comments on commit facb2ed

Please sign in to comment.