From d77925efd12ee0b1b3d7ce8113ee4d35cd3c008d Mon Sep 17 00:00:00 2001 From: Wes McKinney Date: Tue, 21 Jun 2016 14:23:38 -0700 Subject: [PATCH] ARROW-222: Prototyping an IO interface for Arrow, with initial HDFS target - Switch Travis CI back to Ubuntu trusty (old Boost in precise has issues with C++11) - Adapt SFrame libhdfs shim for arrow - Create C++ public API within arrow:io to libhdfs - Implement and test many functions in libhdfs - Start Cython wrapper interface to arrow_io. Begin Python file-like interface, unit tests - Add thirdparty hdfs.h so builds are possible without a local Hadoop distro (e.g. in Travis CI). Change-Id: I4a46e50f6c1c22787baa3749d8a542216341e630 --- .travis.yml | 5 +- NOTICE.txt | 9 + ci/travis_before_script_cpp.sh | 15 +- cpp/CMakeLists.txt | 60 +- cpp/doc/HDFS.md | 39 + cpp/src/arrow/io/CMakeLists.txt | 97 ++ cpp/src/arrow/io/hdfs-io-test.cc | 312 +++++++ cpp/src/arrow/io/hdfs.cc | 458 ++++++++++ cpp/src/arrow/io/hdfs.h | 213 +++++ cpp/src/arrow/io/interfaces.h | 71 ++ cpp/src/arrow/io/libhdfs_shim.cc | 544 ++++++++++++ cpp/src/arrow/parquet/parquet-io-test.cc | 4 +- cpp/thirdparty/hadoop/include/hdfs.h | 1024 ++++++++++++++++++++++ dev/merge_arrow_pr.py | 5 +- python/CMakeLists.txt | 6 +- python/cmake_modules/FindArrow.cmake | 17 +- python/conda.recipe/meta.yaml | 1 + python/pyarrow/error.pxd | 4 +- python/pyarrow/error.pyx | 14 +- python/pyarrow/includes/common.pxd | 18 + python/pyarrow/includes/libarrow.pxd | 19 - python/pyarrow/includes/libarrow_io.pxd | 93 ++ python/pyarrow/io.pyx | 503 +++++++++++ python/pyarrow/tests/test_array.py | 47 +- python/pyarrow/tests/test_io.py | 126 +++ python/setup.py | 9 +- 26 files changed, 3652 insertions(+), 61 deletions(-) create mode 100644 NOTICE.txt create mode 100644 cpp/doc/HDFS.md create mode 100644 cpp/src/arrow/io/CMakeLists.txt create mode 100644 cpp/src/arrow/io/hdfs-io-test.cc create mode 100644 cpp/src/arrow/io/hdfs.cc create mode 100644 cpp/src/arrow/io/hdfs.h create mode 100644 cpp/src/arrow/io/interfaces.h create mode 100644 cpp/src/arrow/io/libhdfs_shim.cc create mode 100644 cpp/thirdparty/hadoop/include/hdfs.h create mode 100644 python/pyarrow/includes/libarrow_io.pxd create mode 100644 python/pyarrow/io.pyx create mode 100644 python/pyarrow/tests/test_io.py diff --git a/.travis.yml b/.travis.yml index ac2b0d457cb8e..97229b1ceb3bc 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,5 +1,5 @@ sudo: required -dist: precise +dist: trusty addons: apt: sources: @@ -12,6 +12,9 @@ addons: - ccache - cmake - valgrind + - libboost-dev + - libboost-filesystem-dev + - libboost-system-dev matrix: fast_finish: true diff --git a/NOTICE.txt b/NOTICE.txt new file mode 100644 index 0000000000000..0310c897cd743 --- /dev/null +++ b/NOTICE.txt @@ -0,0 +1,9 @@ +Apache Arrow +Copyright 2016 The Apache Software Foundation + +This product includes software developed at +The Apache Software Foundation (http://www.apache.org/). + +This product includes software from the SFrame project (BSD, 3-clause). +* Copyright (C) 2015 Dato, Inc. +* Copyright (c) 2009 Carnegie Mellon University. diff --git a/ci/travis_before_script_cpp.sh b/ci/travis_before_script_cpp.sh index 9060cc9b5ef22..08551f3b009a8 100755 --- a/ci/travis_before_script_cpp.sh +++ b/ci/travis_before_script_cpp.sh @@ -23,12 +23,21 @@ echo $GTEST_HOME : ${ARROW_CPP_INSTALL=$TRAVIS_BUILD_DIR/cpp-install} -CMAKE_COMMON_FLAGS="-DARROW_BUILD_BENCHMARKS=ON -DARROW_PARQUET=ON -DCMAKE_INSTALL_PREFIX=$ARROW_CPP_INSTALL" +CMAKE_COMMON_FLAGS="\ +-DARROW_BUILD_BENCHMARKS=ON \ +-DARROW_PARQUET=ON \ +-DARROW_HDFS=on \ +-DCMAKE_INSTALL_PREFIX=$ARROW_CPP_INSTALL" if [ $TRAVIS_OS_NAME == "linux" ]; then - cmake -DARROW_TEST_MEMCHECK=on $CMAKE_COMMON_FLAGS -DCMAKE_CXX_FLAGS="-Werror" $CPP_DIR + cmake -DARROW_TEST_MEMCHECK=on \ + $CMAKE_COMMON_FLAGS \ + -DCMAKE_CXX_FLAGS="-Werror" \ + $CPP_DIR else - cmake $CMAKE_COMMON_FLAGS -DCMAKE_CXX_FLAGS="-Werror" $CPP_DIR + cmake $CMAKE_COMMON_FLAGS \ + -DCMAKE_CXX_FLAGS="-Werror" \ + $CPP_DIR fi make -j4 diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index bdf757238cc6b..18b47599b93d0 100644 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -62,6 +62,10 @@ if("${CMAKE_SOURCE_DIR}" STREQUAL "${CMAKE_CURRENT_SOURCE_DIR}") "Build the Arrow IPC extensions" ON) + option(ARROW_HDFS + "Build the Arrow IO extensions for the Hadoop file system" + OFF) + option(ARROW_SSE3 "Build Arrow with SSE3" ON) @@ -454,6 +458,47 @@ if ("$ENV{GBENCHMARK_HOME}" STREQUAL "") set(GBENCHMARK_HOME ${THIRDPARTY_DIR}/installed) endif() +# ---------------------------------------------------------------------- +# Add Boost dependencies (code adapted from Apache Kudu (incubating)) + +# find boost headers and libs +set(Boost_DEBUG TRUE) +set(Boost_USE_MULTITHREADED ON) +set(Boost_USE_STATIC_LIBS ON) +find_package(Boost COMPONENTS system filesystem REQUIRED) +include_directories(SYSTEM ${Boost_INCLUDE_DIRS}) +set(BOOST_STATIC_LIBS ${Boost_LIBRARIES}) +list(LENGTH BOOST_STATIC_LIBS BOOST_STATIC_LIBS_LEN) + +# Find Boost shared libraries. +set(Boost_USE_STATIC_LIBS OFF) +find_package(Boost COMPONENTS system filesystem REQUIRED) +set(BOOST_SHARED_LIBS ${Boost_LIBRARIES}) +list(LENGTH BOOST_SHARED_LIBS BOOST_SHARED_LIBS_LEN) +list(SORT BOOST_SHARED_LIBS) + +message(STATUS "Boost include dir: " ${Boost_INCLUDE_DIRS}) +message(STATUS "Boost libraries: " ${Boost_LIBRARIES}) + +math(EXPR LAST_IDX "${BOOST_STATIC_LIBS_LEN} - 1") +foreach(IDX RANGE ${LAST_IDX}) + list(GET BOOST_STATIC_LIBS ${IDX} BOOST_STATIC_LIB) + list(GET BOOST_SHARED_LIBS ${IDX} BOOST_SHARED_LIB) + + # Remove the prefix/suffix from the library name. + # + # e.g. libboost_system-mt --> boost_system + get_filename_component(LIB_NAME ${BOOST_STATIC_LIB} NAME_WE) + string(REGEX REPLACE "lib([^-]*)(-mt)?" "\\1" LIB_NAME_NO_PREFIX_SUFFIX ${LIB_NAME}) + ADD_THIRDPARTY_LIB(${LIB_NAME_NO_PREFIX_SUFFIX} + STATIC_LIB "${BOOST_STATIC_LIB}" + SHARED_LIB "${BOOST_SHARED_LIB}") + list(APPEND ARROW_BOOST_LIBS ${LIB_NAME_NO_PREFIX_SUFFIX}) +endforeach() +include_directories(SYSTEM ${Boost_INCLUDE_DIR}) + +# ---------------------------------------------------------------------- +# Enable / disable tests and benchmarks if(ARROW_BUILD_TESTS) add_custom_target(unittest ctest -L unittest) @@ -529,12 +574,24 @@ endif (UNIX) # "make lint" target ############################################################ if (UNIX) + + file(GLOB_RECURSE LINT_FILES + "${CMAKE_CURRENT_SOURCE_DIR}/src/*.h" + "${CMAKE_CURRENT_SOURCE_DIR}/src/*.cc" + ) + + FOREACH(item ${LINT_FILES}) + IF(NOT (item MATCHES "_generated.h")) + LIST(APPEND FILTERED_LINT_FILES ${item}) + ENDIF() + ENDFOREACH(item ${LINT_FILES}) + # Full lint add_custom_target(lint ${BUILD_SUPPORT_DIR}/cpplint.py --verbose=2 --linelength=90 --filter=-whitespace/comments,-readability/todo,-build/header_guard,-build/c++11,-runtime/references - `find ${CMAKE_CURRENT_SOURCE_DIR}/src -name \\*.cc -or -name \\*.h | sed -e '/_generated/g'`) + ${FILTERED_LINT_FILES}) endif (UNIX) @@ -624,6 +681,7 @@ set_target_properties(arrow target_link_libraries(arrow ${LIBARROW_LINK_LIBS}) add_subdirectory(src/arrow) +add_subdirectory(src/arrow/io) add_subdirectory(src/arrow/util) add_subdirectory(src/arrow/types) diff --git a/cpp/doc/HDFS.md b/cpp/doc/HDFS.md new file mode 100644 index 0000000000000..e0d5dfda21d93 --- /dev/null +++ b/cpp/doc/HDFS.md @@ -0,0 +1,39 @@ +## Using Arrow's HDFS (Apache Hadoop Distributed File System) interface + +### Build requirements + +To build the integration, pass the following option to CMake + +```shell +-DARROW_HDFS=on +``` + +For convenience, we have bundled `hdfs.h` for libhdfs from Apache Hadoop in +Arrow's thirdparty. If you wish to build against the `hdfs.h` in your installed +Hadoop distribution, set the `$HADOOP_HOME` environment variable. + +### Runtime requirements + +By default, the HDFS client C++ class in `libarrow_io` uses the libhdfs JNI +interface to the Java Hadoop client. This library is loaded **at runtime** +(rather than at link / library load time, since the library may not be in your +LD_LIBRARY_PATH), and relies on some environment variables. + +* `HADOOP_HOME`: the root of your installed Hadoop distribution. Check in the + `lib/native` directory to look for `libhdfs.so` if you have any questions + about which directory you're after. +* `JAVA_HOME`: the location of your Java SDK installation +* `CLASSPATH`: must contain the Hadoop jars. You can set these using: + +```shell +export CLASSPATH=`$HADOOP_HOME/bin/hadoop classpath --glob` +``` + +#### Setting $JAVA_HOME automatically on OS X + +The installed location of Java on OS X can vary, however the following snippet +will set it automatically for you: + +```shell +export JAVA_HOME=$(/usr/libexec/java_home) +``` \ No newline at end of file diff --git a/cpp/src/arrow/io/CMakeLists.txt b/cpp/src/arrow/io/CMakeLists.txt new file mode 100644 index 0000000000000..33b654f81903f --- /dev/null +++ b/cpp/src/arrow/io/CMakeLists.txt @@ -0,0 +1,97 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# ---------------------------------------------------------------------- +# arrow_io : Arrow IO interfaces + +set(ARROW_IO_LINK_LIBS + arrow +) + +set(ARROW_IO_PRIVATE_LINK_LIBS + boost_system + boost_filesystem +) + +set(ARROW_IO_TEST_LINK_LIBS + arrow_io + ${ARROW_IO_PRIVATE_LINK_LIBS}) + +set(ARROW_IO_SRCS +) + +if(ARROW_HDFS) + if(NOT THIRDPARTY_DIR) + message(FATAL_ERROR "THIRDPARTY_DIR not set") + endif() + + if (DEFINED ENV{HADOOP_HOME}) + set(HADOOP_HOME $ENV{HADOOP_HOME}) + else() + set(HADOOP_HOME "${THIRDPARTY_DIR}/hadoop") + endif() + + set(HDFS_H_PATH "${HADOOP_HOME}/include/hdfs.h") + if (NOT EXISTS ${HDFS_H_PATH}) + message(FATAL_ERROR "Did not find hdfs.h at ${HDFS_H_PATH}") + endif() + message(STATUS "Found hdfs.h at: " ${HDFS_H_PATH}) + message(STATUS "Building libhdfs shim component") + + include_directories(SYSTEM "${HADOOP_HOME}/include") + + set(ARROW_HDFS_SRCS + hdfs.cc + libhdfs_shim.cc) + + set_property(SOURCE ${ARROW_HDFS_SRCS} + APPEND_STRING PROPERTY + COMPILE_FLAGS "-DHAS_HADOOP") + + set(ARROW_IO_SRCS + ${ARROW_HDFS_SRCS} + ${ARROW_IO_SRCS}) + + ADD_ARROW_TEST(hdfs-io-test) + ARROW_TEST_LINK_LIBRARIES(hdfs-io-test + ${ARROW_IO_TEST_LINK_LIBS}) +endif() + +add_library(arrow_io SHARED + ${ARROW_IO_SRCS} +) +target_link_libraries(arrow_io LINK_PUBLIC ${ARROW_IO_LINK_LIBS}) +target_link_libraries(arrow_io LINK_PRIVATE ${ARROW_IO_PRIVATE_LINK_LIBS}) + +SET_TARGET_PROPERTIES(arrow_io PROPERTIES LINKER_LANGUAGE CXX) + +if (APPLE) + set_target_properties(arrow_io + PROPERTIES + BUILD_WITH_INSTALL_RPATH ON + INSTALL_NAME_DIR "@rpath") +endif() + +# Headers: top level +install(FILES + hdfs.h + interfaces.h + DESTINATION include/arrow/io) + +install(TARGETS arrow_io + LIBRARY DESTINATION lib + ARCHIVE DESTINATION lib) diff --git a/cpp/src/arrow/io/hdfs-io-test.cc b/cpp/src/arrow/io/hdfs-io-test.cc new file mode 100644 index 0000000000000..d47699c620f87 --- /dev/null +++ b/cpp/src/arrow/io/hdfs-io-test.cc @@ -0,0 +1,312 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include +#include +#include +#include + +#include "gtest/gtest.h" + +#include // NOLINT + +#include "arrow/io/hdfs.h" +#include "arrow/test-util.h" +#include "arrow/util/status.h" + +namespace arrow { +namespace io { + +std::vector RandomData(int64_t size) { + std::vector buffer(size); + test::random_bytes(size, 0, buffer.data()); + return buffer; +} + +class TestHdfsClient : public ::testing::Test { + public: + Status MakeScratchDir() { + if (client_->Exists(scratch_dir_)) { + RETURN_NOT_OK((client_->Delete(scratch_dir_, true))); + } + return client_->CreateDirectory(scratch_dir_); + } + + Status WriteDummyFile(const std::string& path, const uint8_t* buffer, int64_t size, + bool append = false, int buffer_size = 0, int replication = 0, + int default_block_size = 0) { + std::shared_ptr file; + RETURN_NOT_OK(client_->OpenWriteable( + path, append, buffer_size, replication, default_block_size, &file)); + + RETURN_NOT_OK(file->Write(buffer, size)); + RETURN_NOT_OK(file->Close()); + + return Status::OK(); + } + + std::string ScratchPath(const std::string& name) { + std::stringstream ss; + ss << scratch_dir_ << "/" << name; + return ss.str(); + } + + std::string HdfsAbsPath(const std::string& relpath) { + std::stringstream ss; + ss << "hdfs://" << conf_.host << ":" << conf_.port << relpath; + return ss.str(); + } + + protected: + // Set up shared state between unit tests + static void SetUpTestCase() { + if (!ConnectLibHdfs().ok()) { + std::cout << "Loading libhdfs failed, skipping tests gracefully" << std::endl; + return; + } + + loaded_libhdfs_ = true; + + const char* host = std::getenv("ARROW_HDFS_TEST_HOST"); + const char* port = std::getenv("ARROW_HDFS_TEST_PORT"); + const char* user = std::getenv("ARROW_HDFS_TEST_USER"); + + ASSERT_TRUE(user) << "Set ARROW_HDFS_TEST_USER"; + + conf_.host = host == nullptr ? "localhost" : host; + conf_.user = user; + conf_.port = port == nullptr ? 20500 : atoi(port); + + ASSERT_OK(HdfsClient::Connect(&conf_, &client_)); + } + + static void TearDownTestCase() { + if (client_) { + EXPECT_OK(client_->Delete(scratch_dir_, true)); + EXPECT_OK(client_->Disconnect()); + } + } + + static bool loaded_libhdfs_; + + // Resources shared amongst unit tests + static HdfsConnectionConfig conf_; + static std::string scratch_dir_; + static std::shared_ptr client_; +}; + +bool TestHdfsClient::loaded_libhdfs_ = false; +HdfsConnectionConfig TestHdfsClient::conf_ = HdfsConnectionConfig(); + +std::string TestHdfsClient::scratch_dir_ = + boost::filesystem::unique_path("/tmp/arrow-hdfs/scratch-%%%%").native(); + +std::shared_ptr TestHdfsClient::client_ = nullptr; + +#define SKIP_IF_NO_LIBHDFS() \ + if (!loaded_libhdfs_) { \ + std::cout << "No libhdfs, skipping" << std::endl; \ + return; \ + } + +TEST_F(TestHdfsClient, ConnectsAgain) { + SKIP_IF_NO_LIBHDFS(); + + std::shared_ptr client; + ASSERT_OK(HdfsClient::Connect(&conf_, &client)); + ASSERT_OK(client->Disconnect()); +} + +TEST_F(TestHdfsClient, CreateDirectory) { + SKIP_IF_NO_LIBHDFS(); + + std::string path = ScratchPath("create-directory"); + + if (client_->Exists(path)) { ASSERT_OK(client_->Delete(path, true)); } + + ASSERT_OK(client_->CreateDirectory(path)); + ASSERT_TRUE(client_->Exists(path)); + EXPECT_OK(client_->Delete(path, true)); + ASSERT_FALSE(client_->Exists(path)); +} + +TEST_F(TestHdfsClient, GetCapacityUsed) { + SKIP_IF_NO_LIBHDFS(); + + // Who knows what is actually in your DFS cluster, but expect it to have + // positive used bytes and capacity + int64_t nbytes = 0; + ASSERT_OK(client_->GetCapacity(&nbytes)); + ASSERT_LT(0, nbytes); + + ASSERT_OK(client_->GetUsed(&nbytes)); + ASSERT_LT(0, nbytes); +} + +TEST_F(TestHdfsClient, GetPathInfo) { + SKIP_IF_NO_LIBHDFS(); + + HdfsPathInfo info; + + ASSERT_OK(MakeScratchDir()); + + // Directory info + ASSERT_OK(client_->GetPathInfo(scratch_dir_, &info)); + ASSERT_EQ(ObjectType::DIRECTORY, info.kind); + ASSERT_EQ(HdfsAbsPath(scratch_dir_), info.name); + ASSERT_EQ(conf_.user, info.owner); + + // TODO(wesm): test group, other attrs + + auto path = ScratchPath("test-file"); + + const int size = 100; + + std::vector buffer = RandomData(size); + + ASSERT_OK(WriteDummyFile(path, buffer.data(), size)); + ASSERT_OK(client_->GetPathInfo(path, &info)); + + ASSERT_EQ(ObjectType::FILE, info.kind); + ASSERT_EQ(HdfsAbsPath(path), info.name); + ASSERT_EQ(conf_.user, info.owner); + ASSERT_EQ(size, info.size); +} + +TEST_F(TestHdfsClient, AppendToFile) { + SKIP_IF_NO_LIBHDFS(); + + ASSERT_OK(MakeScratchDir()); + + auto path = ScratchPath("test-file"); + const int size = 100; + + std::vector buffer = RandomData(size); + ASSERT_OK(WriteDummyFile(path, buffer.data(), size)); + + // now append + ASSERT_OK(WriteDummyFile(path, buffer.data(), size, true)); + + HdfsPathInfo info; + ASSERT_OK(client_->GetPathInfo(path, &info)); + ASSERT_EQ(size * 2, info.size); +} + +TEST_F(TestHdfsClient, ListDirectory) { + SKIP_IF_NO_LIBHDFS(); + + const int size = 100; + std::vector data = RandomData(size); + + auto p1 = ScratchPath("test-file-1"); + auto p2 = ScratchPath("test-file-2"); + auto d1 = ScratchPath("test-dir-1"); + + ASSERT_OK(MakeScratchDir()); + ASSERT_OK(WriteDummyFile(p1, data.data(), size)); + ASSERT_OK(WriteDummyFile(p2, data.data(), size / 2)); + ASSERT_OK(client_->CreateDirectory(d1)); + + std::vector listing; + ASSERT_OK(client_->ListDirectory(scratch_dir_, &listing)); + + ASSERT_EQ(3, listing.size()); + + // Argh, well, shouldn't expect the listing to be in any particular order + for (size_t i = 0; i < listing.size(); ++i) { + const HdfsPathInfo& info = listing[i]; + if (info.name == HdfsAbsPath(p1)) { + ASSERT_EQ(ObjectType::FILE, info.kind); + ASSERT_EQ(size, info.size); + } else if (info.name == HdfsAbsPath(p2)) { + ASSERT_EQ(ObjectType::FILE, info.kind); + ASSERT_EQ(size / 2, info.size); + } else if (info.name == HdfsAbsPath(d1)) { + ASSERT_EQ(ObjectType::DIRECTORY, info.kind); + } else { + FAIL() << "Unexpected path: " << info.name; + } + } +} + +TEST_F(TestHdfsClient, ReadableMethods) { + SKIP_IF_NO_LIBHDFS(); + + ASSERT_OK(MakeScratchDir()); + + auto path = ScratchPath("test-file"); + const int size = 100; + + std::vector data = RandomData(size); + ASSERT_OK(WriteDummyFile(path, data.data(), size)); + + std::shared_ptr file; + ASSERT_OK(client_->OpenReadable(path, &file)); + + // Test GetSize -- move this into its own unit test if ever needed + int64_t file_size; + ASSERT_OK(file->GetSize(&file_size)); + ASSERT_EQ(size, file_size); + + uint8_t buffer[50]; + int32_t bytes_read = 0; + + ASSERT_OK(file->Read(50, &bytes_read, buffer)); + ASSERT_EQ(0, std::memcmp(buffer, data.data(), 50)); + ASSERT_EQ(50, bytes_read); + + ASSERT_OK(file->Read(50, &bytes_read, buffer)); + ASSERT_EQ(0, std::memcmp(buffer, data.data() + 50, 50)); + ASSERT_EQ(50, bytes_read); + + // EOF + ASSERT_OK(file->Read(1, &bytes_read, buffer)); + ASSERT_EQ(0, bytes_read); + + // ReadAt to EOF + ASSERT_OK(file->ReadAt(60, 100, &bytes_read, buffer)); + ASSERT_EQ(40, bytes_read); + ASSERT_EQ(0, std::memcmp(buffer, data.data() + 60, bytes_read)); + + // Seek, Tell + ASSERT_OK(file->Seek(60)); + + int64_t position; + ASSERT_OK(file->Tell(&position)); + ASSERT_EQ(60, position); +} + +TEST_F(TestHdfsClient, RenameFile) { + SKIP_IF_NO_LIBHDFS(); + + ASSERT_OK(MakeScratchDir()); + + auto src_path = ScratchPath("src-file"); + auto dst_path = ScratchPath("dst-file"); + const int size = 100; + + std::vector data = RandomData(size); + ASSERT_OK(WriteDummyFile(src_path, data.data(), size)); + + ASSERT_OK(client_->Rename(src_path, dst_path)); + + ASSERT_FALSE(client_->Exists(src_path)); + ASSERT_TRUE(client_->Exists(dst_path)); +} + +} // namespace io +} // namespace arrow diff --git a/cpp/src/arrow/io/hdfs.cc b/cpp/src/arrow/io/hdfs.cc new file mode 100644 index 0000000000000..c868515a135d7 --- /dev/null +++ b/cpp/src/arrow/io/hdfs.cc @@ -0,0 +1,458 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include + +#include +#include +#include + +#include "arrow/io/hdfs.h" +#include "arrow/util/status.h" + +namespace arrow { +namespace io { + +#define CHECK_FAILURE(RETURN_VALUE, WHAT) \ + do { \ + if (RETURN_VALUE == -1) { \ + std::stringstream ss; \ + ss << "HDFS: " << WHAT << " failed"; \ + return Status::IOError(ss.str()); \ + } \ + } while (0) + +static Status CheckReadResult(int ret) { + // Check for error on -1 (possibly errno set) + + // ret == 0 at end of file, which is OK + if (ret == -1) { + // EOF + std::stringstream ss; + ss << "HDFS read failed, errno: " << errno; + return Status::IOError(ss.str()); + } + return Status::OK(); +} + +// ---------------------------------------------------------------------- +// File reading + +class HdfsAnyFileImpl { + public: + void set_members(const std::string& path, hdfsFS fs, hdfsFile handle) { + path_ = path; + fs_ = fs; + file_ = handle; + is_open_ = true; + } + + Status Seek(int64_t position) { + int ret = hdfsSeek(fs_, file_, position); + CHECK_FAILURE(ret, "seek"); + return Status::OK(); + } + + Status Tell(int64_t* offset) { + int64_t ret = hdfsTell(fs_, file_); + CHECK_FAILURE(ret, "tell"); + *offset = ret; + return Status::OK(); + } + + bool is_open() const { return is_open_; } + + protected: + std::string path_; + + // These are pointers in libhdfs, so OK to copy + hdfsFS fs_; + hdfsFile file_; + + bool is_open_; +}; + +// Private implementation for read-only files +class HdfsReadableFile::HdfsReadableFileImpl : public HdfsAnyFileImpl { + public: + HdfsReadableFileImpl() {} + + Status Close() { + if (is_open_) { + int ret = hdfsCloseFile(fs_, file_); + CHECK_FAILURE(ret, "CloseFile"); + is_open_ = false; + } + return Status::OK(); + } + + Status ReadAt(int64_t position, int32_t nbytes, int32_t* bytes_read, uint8_t* buffer) { + tSize ret = hdfsPread(fs_, file_, static_cast(position), + reinterpret_cast(buffer), nbytes); + RETURN_NOT_OK(CheckReadResult(ret)); + *bytes_read = ret; + return Status::OK(); + } + + Status Read(int32_t nbytes, int32_t* bytes_read, uint8_t* buffer) { + tSize ret = hdfsRead(fs_, file_, reinterpret_cast(buffer), nbytes); + RETURN_NOT_OK(CheckReadResult(ret)); + *bytes_read = ret; + return Status::OK(); + } + + Status GetSize(int64_t* size) { + hdfsFileInfo* entry = hdfsGetPathInfo(fs_, path_.c_str()); + if (entry == nullptr) { return Status::IOError("HDFS: GetPathInfo failed"); } + + *size = entry->mSize; + hdfsFreeFileInfo(entry, 1); + return Status::OK(); + } +}; + +HdfsReadableFile::HdfsReadableFile() { + impl_.reset(new HdfsReadableFileImpl()); +} + +HdfsReadableFile::~HdfsReadableFile() { + impl_->Close(); +} + +Status HdfsReadableFile::Close() { + return impl_->Close(); +} + +Status HdfsReadableFile::ReadAt( + int64_t position, int32_t nbytes, int32_t* bytes_read, uint8_t* buffer) { + return impl_->ReadAt(position, nbytes, bytes_read, buffer); +} + +Status HdfsReadableFile::Read(int32_t nbytes, int32_t* bytes_read, uint8_t* buffer) { + return impl_->Read(nbytes, bytes_read, buffer); +} + +Status HdfsReadableFile::GetSize(int64_t* size) { + return impl_->GetSize(size); +} + +Status HdfsReadableFile::Seek(int64_t position) { + return impl_->Seek(position); +} + +Status HdfsReadableFile::Tell(int64_t* position) { + return impl_->Tell(position); +} + +// ---------------------------------------------------------------------- +// File writing + +// Private implementation for writeable-only files +class HdfsWriteableFile::HdfsWriteableFileImpl : public HdfsAnyFileImpl { + public: + HdfsWriteableFileImpl() {} + + Status Close() { + if (is_open_) { + int ret = hdfsFlush(fs_, file_); + CHECK_FAILURE(ret, "Flush"); + ret = hdfsCloseFile(fs_, file_); + CHECK_FAILURE(ret, "CloseFile"); + is_open_ = false; + } + return Status::OK(); + } + + Status Write(const uint8_t* buffer, int32_t nbytes, int32_t* bytes_written) { + tSize ret = hdfsWrite(fs_, file_, reinterpret_cast(buffer), nbytes); + CHECK_FAILURE(ret, "Write"); + *bytes_written = ret; + return Status::OK(); + } +}; + +HdfsWriteableFile::HdfsWriteableFile() { + impl_.reset(new HdfsWriteableFileImpl()); +} + +HdfsWriteableFile::~HdfsWriteableFile() { + impl_->Close(); +} + +Status HdfsWriteableFile::Close() { + return impl_->Close(); +} + +Status HdfsWriteableFile::Write( + const uint8_t* buffer, int32_t nbytes, int32_t* bytes_read) { + return impl_->Write(buffer, nbytes, bytes_read); +} + +Status HdfsWriteableFile::Write(const uint8_t* buffer, int32_t nbytes) { + int32_t bytes_written_dummy = 0; + return Write(buffer, nbytes, &bytes_written_dummy); +} + +Status HdfsWriteableFile::Tell(int64_t* position) { + return impl_->Tell(position); +} + +// ---------------------------------------------------------------------- +// HDFS client + +// TODO(wesm): this could throw std::bad_alloc in the course of copying strings +// into the path info object +static void SetPathInfo(const hdfsFileInfo* input, HdfsPathInfo* out) { + out->kind = input->mKind == kObjectKindFile ? ObjectType::FILE : ObjectType::DIRECTORY; + out->name = std::string(input->mName); + out->owner = std::string(input->mOwner); + out->group = std::string(input->mGroup); + + out->last_access_time = static_cast(input->mLastAccess); + out->last_modified_time = static_cast(input->mLastMod); + out->size = static_cast(input->mSize); + + out->replication = input->mReplication; + out->block_size = input->mBlockSize; + + out->permissions = input->mPermissions; +} + +// Private implementation +class HdfsClient::HdfsClientImpl { + public: + HdfsClientImpl() {} + + Status Connect(const HdfsConnectionConfig* config) { + RETURN_NOT_OK(ConnectLibHdfs()); + + fs_ = hdfsConnectAsUser(config->host.c_str(), config->port, config->user.c_str()); + + if (fs_ == nullptr) { return Status::IOError("HDFS connection failed"); } + namenode_host_ = config->host; + port_ = config->port; + user_ = config->user; + + return Status::OK(); + } + + Status CreateDirectory(const std::string& path) { + int ret = hdfsCreateDirectory(fs_, path.c_str()); + CHECK_FAILURE(ret, "create directory"); + return Status::OK(); + } + + Status Delete(const std::string& path, bool recursive) { + int ret = hdfsDelete(fs_, path.c_str(), static_cast(recursive)); + CHECK_FAILURE(ret, "delete"); + return Status::OK(); + } + + Status Disconnect() { + int ret = hdfsDisconnect(fs_); + CHECK_FAILURE(ret, "hdfsFS::Disconnect"); + return Status::OK(); + } + + bool Exists(const std::string& path) { + // hdfsExists does not distinguish between RPC failure and the file not + // existing + int ret = hdfsExists(fs_, path.c_str()); + return ret == 0; + } + + Status GetCapacity(int64_t* nbytes) { + tOffset ret = hdfsGetCapacity(fs_); + CHECK_FAILURE(ret, "GetCapacity"); + *nbytes = ret; + return Status::OK(); + } + + Status GetUsed(int64_t* nbytes) { + tOffset ret = hdfsGetUsed(fs_); + CHECK_FAILURE(ret, "GetUsed"); + *nbytes = ret; + return Status::OK(); + } + + Status GetPathInfo(const std::string& path, HdfsPathInfo* info) { + hdfsFileInfo* entry = hdfsGetPathInfo(fs_, path.c_str()); + + if (entry == nullptr) { return Status::IOError("HDFS: GetPathInfo failed"); } + + SetPathInfo(entry, info); + hdfsFreeFileInfo(entry, 1); + + return Status::OK(); + } + + Status ListDirectory(const std::string& path, std::vector* listing) { + int num_entries = 0; + hdfsFileInfo* entries = hdfsListDirectory(fs_, path.c_str(), &num_entries); + + if (entries == nullptr) { + // If the directory is empty, entries is NULL but errno is 0. Non-zero + // errno indicates error + // + // Note: errno is thread-locala + if (errno == 0) { num_entries = 0; } + { return Status::IOError("HDFS: list directory failed"); } + } + + // Allocate additional space for elements + + int vec_offset = listing->size(); + listing->resize(vec_offset + num_entries); + + for (int i = 0; i < num_entries; ++i) { + SetPathInfo(entries + i, &(*listing)[i]); + } + + // Free libhdfs file info + hdfsFreeFileInfo(entries, num_entries); + + return Status::OK(); + } + + Status OpenReadable(const std::string& path, std::shared_ptr* file) { + hdfsFile handle = hdfsOpenFile(fs_, path.c_str(), O_RDONLY, 0, 0, 0); + + if (handle == nullptr) { + // TODO(wesm): determine cause of failure + std::stringstream ss; + ss << "Unable to open file " << path; + return Status::IOError(ss.str()); + } + + // std::make_shared does not work with private ctors + *file = std::shared_ptr(new HdfsReadableFile()); + (*file)->impl_->set_members(path, fs_, handle); + + return Status::OK(); + } + + Status OpenWriteable(const std::string& path, bool append, int32_t buffer_size, + int16_t replication, int64_t default_block_size, + std::shared_ptr* file) { + int flags = O_WRONLY; + if (append) flags |= O_APPEND; + + hdfsFile handle = hdfsOpenFile( + fs_, path.c_str(), flags, buffer_size, replication, default_block_size); + + if (handle == nullptr) { + // TODO(wesm): determine cause of failure + std::stringstream ss; + ss << "Unable to open file " << path; + return Status::IOError(ss.str()); + } + + // std::make_shared does not work with private ctors + *file = std::shared_ptr(new HdfsWriteableFile()); + (*file)->impl_->set_members(path, fs_, handle); + + return Status::OK(); + } + + Status Rename(const std::string& src, const std::string& dst) { + int ret = hdfsRename(fs_, src.c_str(), dst.c_str()); + CHECK_FAILURE(ret, "Rename"); + return Status::OK(); + } + + private: + std::string namenode_host_; + std::string user_; + int port_; + + hdfsFS fs_; +}; + +// ---------------------------------------------------------------------- +// Public API for HDFSClient + +HdfsClient::HdfsClient() { + impl_.reset(new HdfsClientImpl()); +} + +HdfsClient::~HdfsClient() {} + +Status HdfsClient::Connect( + const HdfsConnectionConfig* config, std::shared_ptr* fs) { + // ctor is private, make_shared will not work + *fs = std::shared_ptr(new HdfsClient()); + + RETURN_NOT_OK((*fs)->impl_->Connect(config)); + return Status::OK(); +} + +Status HdfsClient::CreateDirectory(const std::string& path) { + return impl_->CreateDirectory(path); +} + +Status HdfsClient::Delete(const std::string& path, bool recursive) { + return impl_->Delete(path, recursive); +} + +Status HdfsClient::Disconnect() { + return impl_->Disconnect(); +} + +bool HdfsClient::Exists(const std::string& path) { + return impl_->Exists(path); +} + +Status HdfsClient::GetPathInfo(const std::string& path, HdfsPathInfo* info) { + return impl_->GetPathInfo(path, info); +} + +Status HdfsClient::GetCapacity(int64_t* nbytes) { + return impl_->GetCapacity(nbytes); +} + +Status HdfsClient::GetUsed(int64_t* nbytes) { + return impl_->GetUsed(nbytes); +} + +Status HdfsClient::ListDirectory( + const std::string& path, std::vector* listing) { + return impl_->ListDirectory(path, listing); +} + +Status HdfsClient::OpenReadable( + const std::string& path, std::shared_ptr* file) { + return impl_->OpenReadable(path, file); +} + +Status HdfsClient::OpenWriteable(const std::string& path, bool append, + int32_t buffer_size, int16_t replication, int64_t default_block_size, + std::shared_ptr* file) { + return impl_->OpenWriteable( + path, append, buffer_size, replication, default_block_size, file); +} + +Status HdfsClient::OpenWriteable( + const std::string& path, bool append, std::shared_ptr* file) { + return OpenWriteable(path, append, 0, 0, 0, file); +} + +Status HdfsClient::Rename(const std::string& src, const std::string& dst) { + return impl_->Rename(src, dst); +} + +} // namespace io +} // namespace arrow diff --git a/cpp/src/arrow/io/hdfs.h b/cpp/src/arrow/io/hdfs.h new file mode 100644 index 0000000000000..a1972db96157a --- /dev/null +++ b/cpp/src/arrow/io/hdfs.h @@ -0,0 +1,213 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef ARROW_IO_HDFS +#define ARROW_IO_HDFS + +#include +#include +#include +#include + +#include "arrow/io/interfaces.h" +#include "arrow/util/macros.h" + +namespace arrow { + +class Status; + +namespace io { + +Status ConnectLibHdfs(); + +class HdfsClient; +class HdfsReadableFile; +class HdfsWriteableFile; + +struct HdfsPathInfo { + ObjectType::type kind; + + std::string name; + std::string owner; + std::string group; + + // Access times in UNIX timestamps (seconds) + int64_t size; + int64_t block_size; + + int32_t last_modified_time; + int32_t last_access_time; + + int16_t replication; + int16_t permissions; +}; + +struct HdfsConnectionConfig { + std::string host; + int port; + std::string user; + + // TODO: Kerberos, etc. +}; + +class HdfsClient : public FileSystemClient { + public: + ~HdfsClient(); + + // Connect to an HDFS cluster at indicated host, port, and as user + // + // @param host (in) + // @param port (in) + // @param user (in): user to identify as + // @param fs (out): the created client + // @returns Status + static Status Connect( + const HdfsConnectionConfig* config, std::shared_ptr* fs); + + // Create directory and all parents + // + // @param path (in): absolute HDFS path + // @returns Status + Status CreateDirectory(const std::string& path); + + // Delete file or directory + // @param path: absolute path to data + // @param recursive: if path is a directory, delete contents as well + // @returns error status on failure + Status Delete(const std::string& path, bool recursive = false); + + // Disconnect from cluster + // + // @returns Status + Status Disconnect(); + + // @param path (in): absolute HDFS path + // @returns bool, true if the path exists, false if not (or on error) + bool Exists(const std::string& path); + + // @param path (in): absolute HDFS path + // @param info (out) + // @returns Status + Status GetPathInfo(const std::string& path, HdfsPathInfo* info); + + // @param nbytes (out): total capacity of the filesystem + // @returns Status + Status GetCapacity(int64_t* nbytes); + + // @param nbytes (out): total bytes used of the filesystem + // @returns Status + Status GetUsed(int64_t* nbytes); + + Status ListDirectory(const std::string& path, std::vector* listing); + + // @param path file path to change + // @param owner pass nullptr for no change + // @param group pass nullptr for no change + Status Chown(const std::string& path, const char* owner, const char* group); + + Status Chmod(const std::string& path, int mode); + + // Move file or directory from source path to destination path within the + // current filesystem + Status Rename(const std::string& src, const std::string& dst); + + // TODO(wesm): GetWorkingDirectory, SetWorkingDirectory + + // Open an HDFS file in READ mode. Returns error + // status if the file is not found. + // + // @param path complete file path + Status OpenReadable(const std::string& path, std::shared_ptr* file); + + // FileMode::WRITE options + // @param path complete file path + // @param buffer_size, 0 for default + // @param replication, 0 for default + // @param default_block_size, 0 for default + Status OpenWriteable(const std::string& path, bool append, int32_t buffer_size, + int16_t replication, int64_t default_block_size, + std::shared_ptr* file); + + Status OpenWriteable( + const std::string& path, bool append, std::shared_ptr* file); + + private: + friend class HdfsReadableFile; + friend class HdfsWriteableFile; + + class HdfsClientImpl; + std::unique_ptr impl_; + + HdfsClient(); + DISALLOW_COPY_AND_ASSIGN(HdfsClient); +}; + +class HdfsReadableFile : public RandomAccessFile { + public: + ~HdfsReadableFile(); + + Status Close() override; + + Status GetSize(int64_t* size) override; + + Status ReadAt( + int64_t position, int32_t nbytes, int32_t* bytes_read, uint8_t* buffer) override; + + Status Seek(int64_t position) override; + Status Tell(int64_t* position) override; + + // NOTE: If you wish to read a particular range of a file in a multithreaded + // context, you may prefer to use ReadAt to avoid locking issues + Status Read(int32_t nbytes, int32_t* bytes_read, uint8_t* buffer) override; + + private: + class HdfsReadableFileImpl; + std::unique_ptr impl_; + + friend class HdfsClient::HdfsClientImpl; + + HdfsReadableFile(); + DISALLOW_COPY_AND_ASSIGN(HdfsReadableFile); +}; + +class HdfsWriteableFile : public WriteableFile { + public: + ~HdfsWriteableFile(); + + Status Close() override; + + Status Write(const uint8_t* buffer, int32_t nbytes) override; + + Status Write(const uint8_t* buffer, int32_t nbytes, int32_t* bytes_written); + + Status Tell(int64_t* position) override; + + private: + class HdfsWriteableFileImpl; + std::unique_ptr impl_; + + friend class HdfsClient::HdfsClientImpl; + + HdfsWriteableFile(); + + DISALLOW_COPY_AND_ASSIGN(HdfsWriteableFile); +}; + +} // namespace io +} // namespace arrow + +#endif // ARROW_IO_HDFS diff --git a/cpp/src/arrow/io/interfaces.h b/cpp/src/arrow/io/interfaces.h new file mode 100644 index 0000000000000..4bd8a8ffc2f9d --- /dev/null +++ b/cpp/src/arrow/io/interfaces.h @@ -0,0 +1,71 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef ARROW_IO_INTERFACES +#define ARROW_IO_INTERFACES + +#include + +namespace arrow { + +class Status; + +namespace io { + +struct FileMode { + enum type { READ, WRITE, READWRITE }; +}; + +struct ObjectType { + enum type { FILE, DIRECTORY }; +}; + +class FileSystemClient { + public: + virtual ~FileSystemClient() {} +}; + +class FileBase { + virtual Status Close() = 0; + + virtual Status Tell(int64_t* position) = 0; +}; + +class ReadableFile : public FileBase { + public: + virtual Status ReadAt( + int64_t position, int32_t nbytes, int32_t* bytes_read, uint8_t* buffer) = 0; + + virtual Status Read(int32_t nbytes, int32_t* bytes_read, uint8_t* buffer) = 0; + + virtual Status GetSize(int64_t* size) = 0; +}; + +class RandomAccessFile : public ReadableFile { + public: + virtual Status Seek(int64_t position) = 0; +}; + +class WriteableFile : public FileBase { + public: + virtual Status Write(const uint8_t* buffer, int32_t nbytes) = 0; +}; + +} // namespace io +} // namespace arrow + +#endif // ARROW_IO_INTERFACES diff --git a/cpp/src/arrow/io/libhdfs_shim.cc b/cpp/src/arrow/io/libhdfs_shim.cc new file mode 100644 index 0000000000000..f75266536e5b3 --- /dev/null +++ b/cpp/src/arrow/io/libhdfs_shim.cc @@ -0,0 +1,544 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// This shim interface to libhdfs (for runtime shared library loading) has been +// adapted from the SFrame project, released under the ASF-compatible 3-clause +// BSD license +// +// Using this required having the $JAVA_HOME and $HADOOP_HOME environment +// variables set, so that libjvm and libhdfs can be located easily + +// Copyright (C) 2015 Dato, Inc. +// All rights reserved. +// +// This software may be modified and distributed under the terms +// of the BSD license. See the LICENSE file for details. + +#ifdef HAS_HADOOP + +#ifndef _WIN32 +#include +#else +#include +#include + +// TODO(wesm): address when/if we add windows support +// #include +#endif + +extern "C" { +#include +} + +#include +#include +#include +#include +#include +#include + +#include // NOLINT +#include // NOLINT + +#include "arrow/util/status.h" + +namespace fs = boost::filesystem; + +extern "C" { + +#ifndef _WIN32 +static void* libhdfs_handle = NULL; +static void* libjvm_handle = NULL; +#else +static HINSTANCE libhdfs_handle = NULL; +static HINSTANCE libjvm_handle = NULL; +#endif +/* + * All the shim pointers + */ + +// NOTE(wesm): cpplint does not like use of short and other imprecise C types + +static hdfsFS (*ptr_hdfsConnectAsUser)( + const char* host, tPort port, const char* user) = NULL; +static hdfsFS (*ptr_hdfsConnect)(const char* host, tPort port) = NULL; +static int (*ptr_hdfsDisconnect)(hdfsFS fs) = NULL; + +static hdfsFile (*ptr_hdfsOpenFile)(hdfsFS fs, const char* path, int flags, + int bufferSize, short replication, tSize blocksize) = NULL; // NOLINT + +static int (*ptr_hdfsCloseFile)(hdfsFS fs, hdfsFile file) = NULL; +static int (*ptr_hdfsExists)(hdfsFS fs, const char* path) = NULL; +static int (*ptr_hdfsSeek)(hdfsFS fs, hdfsFile file, tOffset desiredPos) = NULL; +static tOffset (*ptr_hdfsTell)(hdfsFS fs, hdfsFile file) = NULL; +static tSize (*ptr_hdfsRead)(hdfsFS fs, hdfsFile file, void* buffer, tSize length) = NULL; +static tSize (*ptr_hdfsPread)( + hdfsFS fs, hdfsFile file, tOffset position, void* buffer, tSize length) = NULL; +static tSize (*ptr_hdfsWrite)( + hdfsFS fs, hdfsFile file, const void* buffer, tSize length) = NULL; +static int (*ptr_hdfsFlush)(hdfsFS fs, hdfsFile file) = NULL; +static int (*ptr_hdfsAvailable)(hdfsFS fs, hdfsFile file) = NULL; +static int (*ptr_hdfsCopy)( + hdfsFS srcFS, const char* src, hdfsFS dstFS, const char* dst) = NULL; +static int (*ptr_hdfsMove)( + hdfsFS srcFS, const char* src, hdfsFS dstFS, const char* dst) = NULL; +static int (*ptr_hdfsDelete)(hdfsFS fs, const char* path, int recursive) = NULL; +static int (*ptr_hdfsRename)(hdfsFS fs, const char* oldPath, const char* newPath) = NULL; +static char* (*ptr_hdfsGetWorkingDirectory)( + hdfsFS fs, char* buffer, size_t bufferSize) = NULL; +static int (*ptr_hdfsSetWorkingDirectory)(hdfsFS fs, const char* path) = NULL; +static int (*ptr_hdfsCreateDirectory)(hdfsFS fs, const char* path) = NULL; +static int (*ptr_hdfsSetReplication)( + hdfsFS fs, const char* path, int16_t replication) = NULL; +static hdfsFileInfo* (*ptr_hdfsListDirectory)( + hdfsFS fs, const char* path, int* numEntries) = NULL; +static hdfsFileInfo* (*ptr_hdfsGetPathInfo)(hdfsFS fs, const char* path) = NULL; +static void (*ptr_hdfsFreeFileInfo)(hdfsFileInfo* hdfsFileInfo, int numEntries) = NULL; +static char*** (*ptr_hdfsGetHosts)( + hdfsFS fs, const char* path, tOffset start, tOffset length) = NULL; +static void (*ptr_hdfsFreeHosts)(char*** blockHosts) = NULL; +static tOffset (*ptr_hdfsGetDefaultBlockSize)(hdfsFS fs) = NULL; +static tOffset (*ptr_hdfsGetCapacity)(hdfsFS fs) = NULL; +static tOffset (*ptr_hdfsGetUsed)(hdfsFS fs) = NULL; +static int (*ptr_hdfsChown)( + hdfsFS fs, const char* path, const char* owner, const char* group) = NULL; +static int (*ptr_hdfsChmod)(hdfsFS fs, const char* path, short mode) = NULL; // NOLINT +static int (*ptr_hdfsUtime)(hdfsFS fs, const char* path, tTime mtime, tTime atime) = NULL; + +// Helper functions for dlopens +static std::vector get_potential_libjvm_paths(); +static std::vector get_potential_libhdfs_paths(); +static arrow::Status try_dlopen(std::vector potential_paths, const char* name, +#ifndef _WIN32 + void*& out_handle); +#else + HINSTANCE& out_handle); +#endif + +#define GET_SYMBOL(SYMBOL_NAME) \ + if (!ptr_##SYMBOL_NAME) { \ + *reinterpret_cast(&ptr_##SYMBOL_NAME) = get_symbol("" #SYMBOL_NAME); \ + } + +static void* get_symbol(const char* symbol) { + if (libhdfs_handle == NULL) return NULL; +#ifndef _WIN32 + return dlsym(libhdfs_handle, symbol); +#else + + void* ret = reinterpret_cast(GetProcAddress(libhdfs_handle, symbol)); + if (ret == NULL) { + // logstream(LOG_INFO) << "GetProcAddress error: " + // << get_last_err_str(GetLastError()) << std::endl; + } + return ret; +#endif +} + +hdfsFS hdfsConnectAsUser(const char* host, tPort port, const char* user) { + return ptr_hdfsConnectAsUser(host, port, user); +} + +// Returns NULL on failure +hdfsFS hdfsConnect(const char* host, tPort port) { + if (ptr_hdfsConnect) { + return ptr_hdfsConnect(host, port); + } else { + // TODO: error reporting when shim setup fails + return NULL; + } +} + +int hdfsDisconnect(hdfsFS fs) { + return ptr_hdfsDisconnect(fs); +} + +hdfsFile hdfsOpenFile(hdfsFS fs, const char* path, int flags, int bufferSize, + short replication, tSize blocksize) { // NOLINT + return ptr_hdfsOpenFile(fs, path, flags, bufferSize, replication, blocksize); +} + +int hdfsCloseFile(hdfsFS fs, hdfsFile file) { + return ptr_hdfsCloseFile(fs, file); +} + +int hdfsExists(hdfsFS fs, const char* path) { + return ptr_hdfsExists(fs, path); +} + +int hdfsSeek(hdfsFS fs, hdfsFile file, tOffset desiredPos) { + return ptr_hdfsSeek(fs, file, desiredPos); +} + +tOffset hdfsTell(hdfsFS fs, hdfsFile file) { + return ptr_hdfsTell(fs, file); +} + +tSize hdfsRead(hdfsFS fs, hdfsFile file, void* buffer, tSize length) { + return ptr_hdfsRead(fs, file, buffer, length); +} + +tSize hdfsPread(hdfsFS fs, hdfsFile file, tOffset position, void* buffer, tSize length) { + return ptr_hdfsPread(fs, file, position, buffer, length); +} + +tSize hdfsWrite(hdfsFS fs, hdfsFile file, const void* buffer, tSize length) { + return ptr_hdfsWrite(fs, file, buffer, length); +} + +int hdfsFlush(hdfsFS fs, hdfsFile file) { + return ptr_hdfsFlush(fs, file); +} + +int hdfsAvailable(hdfsFS fs, hdfsFile file) { + GET_SYMBOL(hdfsAvailable); + if (ptr_hdfsAvailable) + return ptr_hdfsAvailable(fs, file); + else + return 0; +} + +int hdfsCopy(hdfsFS srcFS, const char* src, hdfsFS dstFS, const char* dst) { + GET_SYMBOL(hdfsCopy); + if (ptr_hdfsCopy) + return ptr_hdfsCopy(srcFS, src, dstFS, dst); + else + return 0; +} + +int hdfsMove(hdfsFS srcFS, const char* src, hdfsFS dstFS, const char* dst) { + GET_SYMBOL(hdfsMove); + if (ptr_hdfsMove) + return ptr_hdfsMove(srcFS, src, dstFS, dst); + else + return 0; +} + +int hdfsDelete(hdfsFS fs, const char* path, int recursive) { + return ptr_hdfsDelete(fs, path, recursive); +} + +int hdfsRename(hdfsFS fs, const char* oldPath, const char* newPath) { + GET_SYMBOL(hdfsRename); + if (ptr_hdfsRename) + return ptr_hdfsRename(fs, oldPath, newPath); + else + return 0; +} + +char* hdfsGetWorkingDirectory(hdfsFS fs, char* buffer, size_t bufferSize) { + GET_SYMBOL(hdfsGetWorkingDirectory); + if (ptr_hdfsGetWorkingDirectory) { + return ptr_hdfsGetWorkingDirectory(fs, buffer, bufferSize); + } else { + return NULL; + } +} + +int hdfsSetWorkingDirectory(hdfsFS fs, const char* path) { + GET_SYMBOL(hdfsSetWorkingDirectory); + if (ptr_hdfsSetWorkingDirectory) { + return ptr_hdfsSetWorkingDirectory(fs, path); + } else { + return 0; + } +} + +int hdfsCreateDirectory(hdfsFS fs, const char* path) { + return ptr_hdfsCreateDirectory(fs, path); +} + +int hdfsSetReplication(hdfsFS fs, const char* path, int16_t replication) { + GET_SYMBOL(hdfsSetReplication); + if (ptr_hdfsSetReplication) { + return ptr_hdfsSetReplication(fs, path, replication); + } else { + return 0; + } +} + +hdfsFileInfo* hdfsListDirectory(hdfsFS fs, const char* path, int* numEntries) { + return ptr_hdfsListDirectory(fs, path, numEntries); +} + +hdfsFileInfo* hdfsGetPathInfo(hdfsFS fs, const char* path) { + return ptr_hdfsGetPathInfo(fs, path); +} + +void hdfsFreeFileInfo(hdfsFileInfo* hdfsFileInfo, int numEntries) { + ptr_hdfsFreeFileInfo(hdfsFileInfo, numEntries); +} + +char*** hdfsGetHosts(hdfsFS fs, const char* path, tOffset start, tOffset length) { + GET_SYMBOL(hdfsGetHosts); + if (ptr_hdfsGetHosts) { + return ptr_hdfsGetHosts(fs, path, start, length); + } else { + return NULL; + } +} + +void hdfsFreeHosts(char*** blockHosts) { + GET_SYMBOL(hdfsFreeHosts); + if (ptr_hdfsFreeHosts) { ptr_hdfsFreeHosts(blockHosts); } +} + +tOffset hdfsGetDefaultBlockSize(hdfsFS fs) { + GET_SYMBOL(hdfsGetDefaultBlockSize); + if (ptr_hdfsGetDefaultBlockSize) { + return ptr_hdfsGetDefaultBlockSize(fs); + } else { + return 0; + } +} + +tOffset hdfsGetCapacity(hdfsFS fs) { + return ptr_hdfsGetCapacity(fs); +} + +tOffset hdfsGetUsed(hdfsFS fs) { + return ptr_hdfsGetUsed(fs); +} + +int hdfsChown(hdfsFS fs, const char* path, const char* owner, const char* group) { + GET_SYMBOL(hdfsChown); + if (ptr_hdfsChown) { + return ptr_hdfsChown(fs, path, owner, group); + } else { + return 0; + } +} + +int hdfsChmod(hdfsFS fs, const char* path, short mode) { // NOLINT + GET_SYMBOL(hdfsChmod); + if (ptr_hdfsChmod) { + return ptr_hdfsChmod(fs, path, mode); + } else { + return 0; + } +} + +int hdfsUtime(hdfsFS fs, const char* path, tTime mtime, tTime atime) { + GET_SYMBOL(hdfsUtime); + if (ptr_hdfsUtime) { + return ptr_hdfsUtime(fs, path, mtime, atime); + } else { + return 0; + } +} + +static std::vector get_potential_libhdfs_paths() { + std::vector libhdfs_potential_paths = { + // find one in the local directory + fs::path("./libhdfs.so"), fs::path("./hdfs.dll"), + // find a global libhdfs.so + fs::path("libhdfs.so"), fs::path("hdfs.dll"), + }; + + const char* hadoop_home = std::getenv("HADOOP_HOME"); + if (hadoop_home != nullptr) { + auto path = fs::path(hadoop_home) / "lib/native/libhdfs.so"; + libhdfs_potential_paths.push_back(path); + } + return libhdfs_potential_paths; +} + +static std::vector get_potential_libjvm_paths() { + std::vector libjvm_potential_paths; + + std::vector search_prefixes; + std::vector search_suffixes; + std::string file_name; + +// From heuristics +#ifdef __WIN32 + search_prefixes = {""}; + search_suffixes = {"/jre/bin/server", "/bin/server"}; + file_name = "jvm.dll"; +#elif __APPLE__ + search_prefixes = {""}; + search_suffixes = {""}; + file_name = "libjvm.dylib"; + +// SFrame uses /usr/libexec/java_home to find JAVA_HOME; for now we are +// expecting users to set an environment variable +#else + search_prefixes = { + "/usr/lib/jvm/default-java", // ubuntu / debian distros + "/usr/lib/jvm/java", // rhel6 + "/usr/lib/jvm", // centos6 + "/usr/lib64/jvm", // opensuse 13 + "/usr/local/lib/jvm/default-java", // alt ubuntu / debian distros + "/usr/local/lib/jvm/java", // alt rhel6 + "/usr/local/lib/jvm", // alt centos6 + "/usr/local/lib64/jvm", // alt opensuse 13 + "/usr/local/lib/jvm/java-7-openjdk-amd64", // alt ubuntu / debian distros + "/usr/lib/jvm/java-7-openjdk-amd64", // alt ubuntu / debian distros + "/usr/local/lib/jvm/java-6-openjdk-amd64", // alt ubuntu / debian distros + "/usr/lib/jvm/java-6-openjdk-amd64", // alt ubuntu / debian distros + "/usr/lib/jvm/java-7-oracle", // alt ubuntu + "/usr/lib/jvm/java-8-oracle", // alt ubuntu + "/usr/lib/jvm/java-6-oracle", // alt ubuntu + "/usr/local/lib/jvm/java-7-oracle", // alt ubuntu + "/usr/local/lib/jvm/java-8-oracle", // alt ubuntu + "/usr/local/lib/jvm/java-6-oracle", // alt ubuntu + "/usr/lib/jvm/default", // alt centos + "/usr/java/latest", // alt centos + }; + search_suffixes = {"/jre/lib/amd64/server"}; + file_name = "libjvm.so"; +#endif + // From direct environment variable + char* env_value = NULL; + if ((env_value = getenv("JAVA_HOME")) != NULL) { + // logstream(LOG_INFO) << "Found environment variable " << env_name << ": " << + // env_value << std::endl; + search_prefixes.insert(search_prefixes.begin(), env_value); + } + + // Generate cross product between search_prefixes, search_suffixes, and file_name + for (auto& prefix : search_prefixes) { + for (auto& suffix : search_suffixes) { + auto path = (fs::path(prefix) / fs::path(suffix) / fs::path(file_name)); + libjvm_potential_paths.push_back(path); + } + } + + return libjvm_potential_paths; +} + +#ifndef _WIN32 +static arrow::Status try_dlopen( + std::vector potential_paths, const char* name, void*& out_handle) { + std::vector error_messages; + + for (auto& i : potential_paths) { + i.make_preferred(); + // logstream(LOG_INFO) << "Trying " << i.string().c_str() << std::endl; + out_handle = dlopen(i.native().c_str(), RTLD_NOW | RTLD_LOCAL); + + if (out_handle != NULL) { + // logstream(LOG_INFO) << "Success!" << std::endl; + break; + } else { + const char* err_msg = dlerror(); + if (err_msg != NULL) { + error_messages.push_back(std::string(err_msg)); + } else { + error_messages.push_back(std::string(" returned NULL")); + } + } + } + + if (out_handle == NULL) { + std::stringstream ss; + ss << "Unable to load " << name; + return arrow::Status::IOError(ss.str()); + } + + return arrow::Status::OK(); +} + +#else +static arrow::Status try_dlopen( + std::vector potential_paths, const char* name, HINSTANCE& out_handle) { + std::vector error_messages; + + for (auto& i : potential_paths) { + i.make_preferred(); + // logstream(LOG_INFO) << "Trying " << i.string().c_str() << std::endl; + + out_handle = LoadLibrary(i.string().c_str()); + + if (out_handle != NULL) { + // logstream(LOG_INFO) << "Success!" << std::endl; + break; + } else { + // error_messages.push_back(get_last_err_str(GetLastError())); + } + } + + if (out_handle == NULL) { + std::stringstream ss; + ss << "Unable to load " << name; + return arrow::Status::IOError(ss.str()); + } + + return arrow::Status::OK(); +} +#endif // _WIN32 + +} // extern "C" + +#define GET_SYMBOL_REQUIRED(SYMBOL_NAME) \ + do { \ + if (!ptr_##SYMBOL_NAME) { \ + *reinterpret_cast(&ptr_##SYMBOL_NAME) = get_symbol("" #SYMBOL_NAME); \ + } \ + if (!ptr_##SYMBOL_NAME) \ + return Status::IOError("Getting symbol " #SYMBOL_NAME "failed"); \ + } while (0) + +namespace arrow { +namespace io { + +Status ConnectLibHdfs() { + static std::mutex lock; + std::lock_guard guard(lock); + + static bool shim_attempted = false; + if (!shim_attempted) { + shim_attempted = true; + + std::vector libjvm_potential_paths = get_potential_libjvm_paths(); + RETURN_NOT_OK(try_dlopen(libjvm_potential_paths, "libjvm", libjvm_handle)); + + std::vector libhdfs_potential_paths = get_potential_libhdfs_paths(); + RETURN_NOT_OK(try_dlopen(libhdfs_potential_paths, "libhdfs", libhdfs_handle)); + } else if (libhdfs_handle == nullptr) { + return Status::IOError("Prior attempt to load libhdfs failed"); + } + + GET_SYMBOL_REQUIRED(hdfsConnect); + GET_SYMBOL_REQUIRED(hdfsConnectAsUser); + GET_SYMBOL_REQUIRED(hdfsCreateDirectory); + GET_SYMBOL_REQUIRED(hdfsDelete); + GET_SYMBOL_REQUIRED(hdfsDisconnect); + GET_SYMBOL_REQUIRED(hdfsExists); + GET_SYMBOL_REQUIRED(hdfsFreeFileInfo); + GET_SYMBOL_REQUIRED(hdfsGetCapacity); + GET_SYMBOL_REQUIRED(hdfsGetUsed); + GET_SYMBOL_REQUIRED(hdfsGetPathInfo); + GET_SYMBOL_REQUIRED(hdfsListDirectory); + + // File methods + GET_SYMBOL_REQUIRED(hdfsCloseFile); + GET_SYMBOL_REQUIRED(hdfsFlush); + GET_SYMBOL_REQUIRED(hdfsOpenFile); + GET_SYMBOL_REQUIRED(hdfsRead); + GET_SYMBOL_REQUIRED(hdfsPread); + GET_SYMBOL_REQUIRED(hdfsSeek); + GET_SYMBOL_REQUIRED(hdfsTell); + GET_SYMBOL_REQUIRED(hdfsWrite); + + return Status::OK(); +} + +} // namespace io +} // namespace arrow + +#endif // HAS_HADOOP diff --git a/cpp/src/arrow/parquet/parquet-io-test.cc b/cpp/src/arrow/parquet/parquet-io-test.cc index db779d8309cf6..edcac88705668 100644 --- a/cpp/src/arrow/parquet/parquet-io-test.cc +++ b/cpp/src/arrow/parquet/parquet-io-test.cc @@ -126,8 +126,8 @@ class TestParquetIO : public ::testing::Test { size_t chunk_size = values.size() / num_chunks; for (int i = 0; i < num_chunks; i++) { auto row_group_writer = file_writer->AppendRowGroup(chunk_size); - auto column_writer = static_cast*>( - row_group_writer->NextColumn()); + auto column_writer = + static_cast*>(row_group_writer->NextColumn()); T* data = values.data() + i * chunk_size; column_writer->WriteBatch(chunk_size, nullptr, nullptr, data); column_writer->Close(); diff --git a/cpp/thirdparty/hadoop/include/hdfs.h b/cpp/thirdparty/hadoop/include/hdfs.h new file mode 100644 index 0000000000000..a4df6ae3b2be7 --- /dev/null +++ b/cpp/thirdparty/hadoop/include/hdfs.h @@ -0,0 +1,1024 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef LIBHDFS_HDFS_H +#define LIBHDFS_HDFS_H + +#include /* for EINTERNAL, etc. */ +#include /* for O_RDONLY, O_WRONLY */ +#include /* for uint64_t, etc. */ +#include /* for time_t */ + +/* + * Support export of DLL symbols during libhdfs build, and import of DLL symbols + * during client application build. A client application may optionally define + * symbol LIBHDFS_DLL_IMPORT in its build. This is not strictly required, but + * the compiler can produce more efficient code with it. + */ +#ifdef WIN32 + #ifdef LIBHDFS_DLL_EXPORT + #define LIBHDFS_EXTERNAL __declspec(dllexport) + #elif LIBHDFS_DLL_IMPORT + #define LIBHDFS_EXTERNAL __declspec(dllimport) + #else + #define LIBHDFS_EXTERNAL + #endif +#else + #ifdef LIBHDFS_DLL_EXPORT + #define LIBHDFS_EXTERNAL __attribute__((visibility("default"))) + #elif LIBHDFS_DLL_IMPORT + #define LIBHDFS_EXTERNAL __attribute__((visibility("default"))) + #else + #define LIBHDFS_EXTERNAL + #endif +#endif + +#ifndef O_RDONLY +#define O_RDONLY 1 +#endif + +#ifndef O_WRONLY +#define O_WRONLY 2 +#endif + +#ifndef EINTERNAL +#define EINTERNAL 255 +#endif + +#define ELASTIC_BYTE_BUFFER_POOL_CLASS \ + "org/apache/hadoop/io/ElasticByteBufferPool" + +/** All APIs set errno to meaningful values */ + +#ifdef __cplusplus +extern "C" { +#endif + /** + * Some utility decls used in libhdfs. + */ + struct hdfsBuilder; + typedef int32_t tSize; /// size of data for read/write io ops + typedef time_t tTime; /// time type in seconds + typedef int64_t tOffset;/// offset within the file + typedef uint16_t tPort; /// port + typedef enum tObjectKind { + kObjectKindFile = 'F', + kObjectKindDirectory = 'D', + } tObjectKind; + struct hdfsStreamBuilder; + + + /** + * The C reflection of org.apache.org.hadoop.FileSystem . + */ + struct hdfs_internal; + typedef struct hdfs_internal* hdfsFS; + + struct hdfsFile_internal; + typedef struct hdfsFile_internal* hdfsFile; + + struct hadoopRzOptions; + + struct hadoopRzBuffer; + + /** + * Determine if a file is open for read. + * + * @param file The HDFS file + * @return 1 if the file is open for read; 0 otherwise + */ + LIBHDFS_EXTERNAL + int hdfsFileIsOpenForRead(hdfsFile file); + + /** + * Determine if a file is open for write. + * + * @param file The HDFS file + * @return 1 if the file is open for write; 0 otherwise + */ + LIBHDFS_EXTERNAL + int hdfsFileIsOpenForWrite(hdfsFile file); + + struct hdfsReadStatistics { + uint64_t totalBytesRead; + uint64_t totalLocalBytesRead; + uint64_t totalShortCircuitBytesRead; + uint64_t totalZeroCopyBytesRead; + }; + + /** + * Get read statistics about a file. This is only applicable to files + * opened for reading. + * + * @param file The HDFS file + * @param stats (out parameter) on a successful return, the read + * statistics. Unchanged otherwise. You must free the + * returned statistics with hdfsFileFreeReadStatistics. + * @return 0 if the statistics were successfully returned, + * -1 otherwise. On a failure, please check errno against + * ENOTSUP. webhdfs, LocalFilesystem, and so forth may + * not support read statistics. + */ + LIBHDFS_EXTERNAL + int hdfsFileGetReadStatistics(hdfsFile file, + struct hdfsReadStatistics **stats); + + /** + * @param stats HDFS read statistics for a file. + * + * @return the number of remote bytes read. + */ + LIBHDFS_EXTERNAL + int64_t hdfsReadStatisticsGetRemoteBytesRead( + const struct hdfsReadStatistics *stats); + + /** + * Clear the read statistics for a file. + * + * @param file The file to clear the read statistics of. + * + * @return 0 on success; the error code otherwise. + * EINVAL: the file is not open for reading. + * ENOTSUP: the file does not support clearing the read + * statistics. + * Errno will also be set to this code on failure. + */ + LIBHDFS_EXTERNAL + int hdfsFileClearReadStatistics(hdfsFile file); + + /** + * Free some HDFS read statistics. + * + * @param stats The HDFS read statistics to free. + */ + LIBHDFS_EXTERNAL + void hdfsFileFreeReadStatistics(struct hdfsReadStatistics *stats); + + /** + * hdfsConnectAsUser - Connect to a hdfs file system as a specific user + * Connect to the hdfs. + * @param nn The NameNode. See hdfsBuilderSetNameNode for details. + * @param port The port on which the server is listening. + * @param user the user name (this is hadoop domain user). Or NULL is equivelant to hhdfsConnect(host, port) + * @return Returns a handle to the filesystem or NULL on error. + * @deprecated Use hdfsBuilderConnect instead. + */ + LIBHDFS_EXTERNAL + hdfsFS hdfsConnectAsUser(const char* nn, tPort port, const char *user); + + /** + * hdfsConnect - Connect to a hdfs file system. + * Connect to the hdfs. + * @param nn The NameNode. See hdfsBuilderSetNameNode for details. + * @param port The port on which the server is listening. + * @return Returns a handle to the filesystem or NULL on error. + * @deprecated Use hdfsBuilderConnect instead. + */ + LIBHDFS_EXTERNAL + hdfsFS hdfsConnect(const char* nn, tPort port); + + /** + * hdfsConnect - Connect to an hdfs file system. + * + * Forces a new instance to be created + * + * @param nn The NameNode. See hdfsBuilderSetNameNode for details. + * @param port The port on which the server is listening. + * @param user The user name to use when connecting + * @return Returns a handle to the filesystem or NULL on error. + * @deprecated Use hdfsBuilderConnect instead. + */ + LIBHDFS_EXTERNAL + hdfsFS hdfsConnectAsUserNewInstance(const char* nn, tPort port, const char *user ); + + /** + * hdfsConnect - Connect to an hdfs file system. + * + * Forces a new instance to be created + * + * @param nn The NameNode. See hdfsBuilderSetNameNode for details. + * @param port The port on which the server is listening. + * @return Returns a handle to the filesystem or NULL on error. + * @deprecated Use hdfsBuilderConnect instead. + */ + LIBHDFS_EXTERNAL + hdfsFS hdfsConnectNewInstance(const char* nn, tPort port); + + /** + * Connect to HDFS using the parameters defined by the builder. + * + * The HDFS builder will be freed, whether or not the connection was + * successful. + * + * Every successful call to hdfsBuilderConnect should be matched with a call + * to hdfsDisconnect, when the hdfsFS is no longer needed. + * + * @param bld The HDFS builder + * @return Returns a handle to the filesystem, or NULL on error. + */ + LIBHDFS_EXTERNAL + hdfsFS hdfsBuilderConnect(struct hdfsBuilder *bld); + + /** + * Create an HDFS builder. + * + * @return The HDFS builder, or NULL on error. + */ + LIBHDFS_EXTERNAL + struct hdfsBuilder *hdfsNewBuilder(void); + + /** + * Force the builder to always create a new instance of the FileSystem, + * rather than possibly finding one in the cache. + * + * @param bld The HDFS builder + */ + LIBHDFS_EXTERNAL + void hdfsBuilderSetForceNewInstance(struct hdfsBuilder *bld); + + /** + * Set the HDFS NameNode to connect to. + * + * @param bld The HDFS builder + * @param nn The NameNode to use. + * + * If the string given is 'default', the default NameNode + * configuration will be used (from the XML configuration files) + * + * If NULL is given, a LocalFileSystem will be created. + * + * If the string starts with a protocol type such as file:// or + * hdfs://, this protocol type will be used. If not, the + * hdfs:// protocol type will be used. + * + * You may specify a NameNode port in the usual way by + * passing a string of the format hdfs://:. + * Alternately, you may set the port with + * hdfsBuilderSetNameNodePort. However, you must not pass the + * port in two different ways. + */ + LIBHDFS_EXTERNAL + void hdfsBuilderSetNameNode(struct hdfsBuilder *bld, const char *nn); + + /** + * Set the port of the HDFS NameNode to connect to. + * + * @param bld The HDFS builder + * @param port The port. + */ + LIBHDFS_EXTERNAL + void hdfsBuilderSetNameNodePort(struct hdfsBuilder *bld, tPort port); + + /** + * Set the username to use when connecting to the HDFS cluster. + * + * @param bld The HDFS builder + * @param userName The user name. The string will be shallow-copied. + */ + LIBHDFS_EXTERNAL + void hdfsBuilderSetUserName(struct hdfsBuilder *bld, const char *userName); + + /** + * Set the path to the Kerberos ticket cache to use when connecting to + * the HDFS cluster. + * + * @param bld The HDFS builder + * @param kerbTicketCachePath The Kerberos ticket cache path. The string + * will be shallow-copied. + */ + LIBHDFS_EXTERNAL + void hdfsBuilderSetKerbTicketCachePath(struct hdfsBuilder *bld, + const char *kerbTicketCachePath); + + /** + * Free an HDFS builder. + * + * It is normally not necessary to call this function since + * hdfsBuilderConnect frees the builder. + * + * @param bld The HDFS builder + */ + LIBHDFS_EXTERNAL + void hdfsFreeBuilder(struct hdfsBuilder *bld); + + /** + * Set a configuration string for an HdfsBuilder. + * + * @param key The key to set. + * @param val The value, or NULL to set no value. + * This will be shallow-copied. You are responsible for + * ensuring that it remains valid until the builder is + * freed. + * + * @return 0 on success; nonzero error code otherwise. + */ + LIBHDFS_EXTERNAL + int hdfsBuilderConfSetStr(struct hdfsBuilder *bld, const char *key, + const char *val); + + /** + * Get a configuration string. + * + * @param key The key to find + * @param val (out param) The value. This will be set to NULL if the + * key isn't found. You must free this string with + * hdfsConfStrFree. + * + * @return 0 on success; nonzero error code otherwise. + * Failure to find the key is not an error. + */ + LIBHDFS_EXTERNAL + int hdfsConfGetStr(const char *key, char **val); + + /** + * Get a configuration integer. + * + * @param key The key to find + * @param val (out param) The value. This will NOT be changed if the + * key isn't found. + * + * @return 0 on success; nonzero error code otherwise. + * Failure to find the key is not an error. + */ + LIBHDFS_EXTERNAL + int hdfsConfGetInt(const char *key, int32_t *val); + + /** + * Free a configuration string found with hdfsConfGetStr. + * + * @param val A configuration string obtained from hdfsConfGetStr + */ + LIBHDFS_EXTERNAL + void hdfsConfStrFree(char *val); + + /** + * hdfsDisconnect - Disconnect from the hdfs file system. + * Disconnect from hdfs. + * @param fs The configured filesystem handle. + * @return Returns 0 on success, -1 on error. + * Even if there is an error, the resources associated with the + * hdfsFS will be freed. + */ + LIBHDFS_EXTERNAL + int hdfsDisconnect(hdfsFS fs); + + /** + * hdfsOpenFile - Open a hdfs file in given mode. + * @deprecated Use the hdfsStreamBuilder functions instead. + * This function does not support setting block sizes bigger than 2 GB. + * + * @param fs The configured filesystem handle. + * @param path The full path to the file. + * @param flags - an | of bits/fcntl.h file flags - supported flags are O_RDONLY, O_WRONLY (meaning create or overwrite i.e., implies O_TRUNCAT), + * O_WRONLY|O_APPEND. Other flags are generally ignored other than (O_RDWR || (O_EXCL & O_CREAT)) which return NULL and set errno equal ENOTSUP. + * @param bufferSize Size of buffer for read/write - pass 0 if you want + * to use the default configured values. + * @param replication Block replication - pass 0 if you want to use + * the default configured values. + * @param blocksize Size of block - pass 0 if you want to use the + * default configured values. Note that if you want a block size bigger + * than 2 GB, you must use the hdfsStreamBuilder API rather than this + * deprecated function. + * @return Returns the handle to the open file or NULL on error. + */ + LIBHDFS_EXTERNAL + hdfsFile hdfsOpenFile(hdfsFS fs, const char* path, int flags, + int bufferSize, short replication, tSize blocksize); + + /** + * hdfsStreamBuilderAlloc - Allocate an HDFS stream builder. + * + * @param fs The configured filesystem handle. + * @param path The full path to the file. Will be deep-copied. + * @param flags The open flags, as in hdfsOpenFile. + * @return Returns the hdfsStreamBuilder, or NULL on error. + */ + LIBHDFS_EXTERNAL + struct hdfsStreamBuilder *hdfsStreamBuilderAlloc(hdfsFS fs, + const char *path, int flags); + + /** + * hdfsStreamBuilderFree - Free an HDFS file builder. + * + * It is normally not necessary to call this function since + * hdfsStreamBuilderBuild frees the builder. + * + * @param bld The hdfsStreamBuilder to free. + */ + LIBHDFS_EXTERNAL + void hdfsStreamBuilderFree(struct hdfsStreamBuilder *bld); + + /** + * hdfsStreamBuilderSetBufferSize - Set the stream buffer size. + * + * @param bld The hdfs stream builder. + * @param bufferSize The buffer size to set. + * + * @return 0 on success, or -1 on error. Errno will be set on error. + */ + LIBHDFS_EXTERNAL + int hdfsStreamBuilderSetBufferSize(struct hdfsStreamBuilder *bld, + int32_t bufferSize); + + /** + * hdfsStreamBuilderSetReplication - Set the replication for the stream. + * This is only relevant for output streams, which will create new blocks. + * + * @param bld The hdfs stream builder. + * @param replication The replication to set. + * + * @return 0 on success, or -1 on error. Errno will be set on error. + * If you call this on an input stream builder, you will get + * EINVAL, because this configuration is not relevant to input + * streams. + */ + LIBHDFS_EXTERNAL + int hdfsStreamBuilderSetReplication(struct hdfsStreamBuilder *bld, + int16_t replication); + + /** + * hdfsStreamBuilderSetDefaultBlockSize - Set the default block size for + * the stream. This is only relevant for output streams, which will create + * new blocks. + * + * @param bld The hdfs stream builder. + * @param defaultBlockSize The default block size to set. + * + * @return 0 on success, or -1 on error. Errno will be set on error. + * If you call this on an input stream builder, you will get + * EINVAL, because this configuration is not relevant to input + * streams. + */ + LIBHDFS_EXTERNAL + int hdfsStreamBuilderSetDefaultBlockSize(struct hdfsStreamBuilder *bld, + int64_t defaultBlockSize); + + /** + * hdfsStreamBuilderBuild - Build the stream by calling open or create. + * + * @param bld The hdfs stream builder. This pointer will be freed, whether + * or not the open succeeds. + * + * @return the stream pointer on success, or NULL on error. Errno will be + * set on error. + */ + LIBHDFS_EXTERNAL + hdfsFile hdfsStreamBuilderBuild(struct hdfsStreamBuilder *bld); + + /** + * hdfsTruncateFile - Truncate a hdfs file to given lenght. + * @param fs The configured filesystem handle. + * @param path The full path to the file. + * @param newlength The size the file is to be truncated to + * @return 1 if the file has been truncated to the desired newlength + * and is immediately available to be reused for write operations + * such as append. + * 0 if a background process of adjusting the length of the last + * block has been started, and clients should wait for it to + * complete before proceeding with further file updates. + * -1 on error. + */ + int hdfsTruncateFile(hdfsFS fs, const char* path, tOffset newlength); + + /** + * hdfsUnbufferFile - Reduce the buffering done on a file. + * + * @param file The file to unbuffer. + * @return 0 on success + * ENOTSUP if the file does not support unbuffering + * Errno will also be set to this value. + */ + LIBHDFS_EXTERNAL + int hdfsUnbufferFile(hdfsFile file); + + /** + * hdfsCloseFile - Close an open file. + * @param fs The configured filesystem handle. + * @param file The file handle. + * @return Returns 0 on success, -1 on error. + * On error, errno will be set appropriately. + * If the hdfs file was valid, the memory associated with it will + * be freed at the end of this call, even if there was an I/O + * error. + */ + LIBHDFS_EXTERNAL + int hdfsCloseFile(hdfsFS fs, hdfsFile file); + + + /** + * hdfsExists - Checks if a given path exsits on the filesystem + * @param fs The configured filesystem handle. + * @param path The path to look for + * @return Returns 0 on success, -1 on error. + */ + LIBHDFS_EXTERNAL + int hdfsExists(hdfsFS fs, const char *path); + + + /** + * hdfsSeek - Seek to given offset in file. + * This works only for files opened in read-only mode. + * @param fs The configured filesystem handle. + * @param file The file handle. + * @param desiredPos Offset into the file to seek into. + * @return Returns 0 on success, -1 on error. + */ + LIBHDFS_EXTERNAL + int hdfsSeek(hdfsFS fs, hdfsFile file, tOffset desiredPos); + + + /** + * hdfsTell - Get the current offset in the file, in bytes. + * @param fs The configured filesystem handle. + * @param file The file handle. + * @return Current offset, -1 on error. + */ + LIBHDFS_EXTERNAL + tOffset hdfsTell(hdfsFS fs, hdfsFile file); + + + /** + * hdfsRead - Read data from an open file. + * @param fs The configured filesystem handle. + * @param file The file handle. + * @param buffer The buffer to copy read bytes into. + * @param length The length of the buffer. + * @return On success, a positive number indicating how many bytes + * were read. + * On end-of-file, 0. + * On error, -1. Errno will be set to the error code. + * Just like the POSIX read function, hdfsRead will return -1 + * and set errno to EINTR if data is temporarily unavailable, + * but we are not yet at the end of the file. + */ + LIBHDFS_EXTERNAL + tSize hdfsRead(hdfsFS fs, hdfsFile file, void* buffer, tSize length); + + /** + * hdfsPread - Positional read of data from an open file. + * @param fs The configured filesystem handle. + * @param file The file handle. + * @param position Position from which to read + * @param buffer The buffer to copy read bytes into. + * @param length The length of the buffer. + * @return See hdfsRead + */ + LIBHDFS_EXTERNAL + tSize hdfsPread(hdfsFS fs, hdfsFile file, tOffset position, + void* buffer, tSize length); + + + /** + * hdfsWrite - Write data into an open file. + * @param fs The configured filesystem handle. + * @param file The file handle. + * @param buffer The data. + * @param length The no. of bytes to write. + * @return Returns the number of bytes written, -1 on error. + */ + LIBHDFS_EXTERNAL + tSize hdfsWrite(hdfsFS fs, hdfsFile file, const void* buffer, + tSize length); + + + /** + * hdfsWrite - Flush the data. + * @param fs The configured filesystem handle. + * @param file The file handle. + * @return Returns 0 on success, -1 on error. + */ + LIBHDFS_EXTERNAL + int hdfsFlush(hdfsFS fs, hdfsFile file); + + + /** + * hdfsHFlush - Flush out the data in client's user buffer. After the + * return of this call, new readers will see the data. + * @param fs configured filesystem handle + * @param file file handle + * @return 0 on success, -1 on error and sets errno + */ + LIBHDFS_EXTERNAL + int hdfsHFlush(hdfsFS fs, hdfsFile file); + + + /** + * hdfsHSync - Similar to posix fsync, Flush out the data in client's + * user buffer. all the way to the disk device (but the disk may have + * it in its cache). + * @param fs configured filesystem handle + * @param file file handle + * @return 0 on success, -1 on error and sets errno + */ + LIBHDFS_EXTERNAL + int hdfsHSync(hdfsFS fs, hdfsFile file); + + + /** + * hdfsAvailable - Number of bytes that can be read from this + * input stream without blocking. + * @param fs The configured filesystem handle. + * @param file The file handle. + * @return Returns available bytes; -1 on error. + */ + LIBHDFS_EXTERNAL + int hdfsAvailable(hdfsFS fs, hdfsFile file); + + + /** + * hdfsCopy - Copy file from one filesystem to another. + * @param srcFS The handle to source filesystem. + * @param src The path of source file. + * @param dstFS The handle to destination filesystem. + * @param dst The path of destination file. + * @return Returns 0 on success, -1 on error. + */ + LIBHDFS_EXTERNAL + int hdfsCopy(hdfsFS srcFS, const char* src, hdfsFS dstFS, const char* dst); + + + /** + * hdfsMove - Move file from one filesystem to another. + * @param srcFS The handle to source filesystem. + * @param src The path of source file. + * @param dstFS The handle to destination filesystem. + * @param dst The path of destination file. + * @return Returns 0 on success, -1 on error. + */ + LIBHDFS_EXTERNAL + int hdfsMove(hdfsFS srcFS, const char* src, hdfsFS dstFS, const char* dst); + + + /** + * hdfsDelete - Delete file. + * @param fs The configured filesystem handle. + * @param path The path of the file. + * @param recursive if path is a directory and set to + * non-zero, the directory is deleted else throws an exception. In + * case of a file the recursive argument is irrelevant. + * @return Returns 0 on success, -1 on error. + */ + LIBHDFS_EXTERNAL + int hdfsDelete(hdfsFS fs, const char* path, int recursive); + + /** + * hdfsRename - Rename file. + * @param fs The configured filesystem handle. + * @param oldPath The path of the source file. + * @param newPath The path of the destination file. + * @return Returns 0 on success, -1 on error. + */ + LIBHDFS_EXTERNAL + int hdfsRename(hdfsFS fs, const char* oldPath, const char* newPath); + + + /** + * hdfsGetWorkingDirectory - Get the current working directory for + * the given filesystem. + * @param fs The configured filesystem handle. + * @param buffer The user-buffer to copy path of cwd into. + * @param bufferSize The length of user-buffer. + * @return Returns buffer, NULL on error. + */ + LIBHDFS_EXTERNAL + char* hdfsGetWorkingDirectory(hdfsFS fs, char *buffer, size_t bufferSize); + + + /** + * hdfsSetWorkingDirectory - Set the working directory. All relative + * paths will be resolved relative to it. + * @param fs The configured filesystem handle. + * @param path The path of the new 'cwd'. + * @return Returns 0 on success, -1 on error. + */ + LIBHDFS_EXTERNAL + int hdfsSetWorkingDirectory(hdfsFS fs, const char* path); + + + /** + * hdfsCreateDirectory - Make the given file and all non-existent + * parents into directories. + * @param fs The configured filesystem handle. + * @param path The path of the directory. + * @return Returns 0 on success, -1 on error. + */ + LIBHDFS_EXTERNAL + int hdfsCreateDirectory(hdfsFS fs, const char* path); + + + /** + * hdfsSetReplication - Set the replication of the specified + * file to the supplied value + * @param fs The configured filesystem handle. + * @param path The path of the file. + * @return Returns 0 on success, -1 on error. + */ + LIBHDFS_EXTERNAL + int hdfsSetReplication(hdfsFS fs, const char* path, int16_t replication); + + + /** + * hdfsFileInfo - Information about a file/directory. + */ + typedef struct { + tObjectKind mKind; /* file or directory */ + char *mName; /* the name of the file */ + tTime mLastMod; /* the last modification time for the file in seconds */ + tOffset mSize; /* the size of the file in bytes */ + short mReplication; /* the count of replicas */ + tOffset mBlockSize; /* the block size for the file */ + char *mOwner; /* the owner of the file */ + char *mGroup; /* the group associated with the file */ + short mPermissions; /* the permissions associated with the file */ + tTime mLastAccess; /* the last access time for the file in seconds */ + } hdfsFileInfo; + + + /** + * hdfsListDirectory - Get list of files/directories for a given + * directory-path. hdfsFreeFileInfo should be called to deallocate memory. + * @param fs The configured filesystem handle. + * @param path The path of the directory. + * @param numEntries Set to the number of files/directories in path. + * @return Returns a dynamically-allocated array of hdfsFileInfo + * objects; NULL on error or empty directory. + * errno is set to non-zero on error or zero on success. + */ + LIBHDFS_EXTERNAL + hdfsFileInfo *hdfsListDirectory(hdfsFS fs, const char* path, + int *numEntries); + + + /** + * hdfsGetPathInfo - Get information about a path as a (dynamically + * allocated) single hdfsFileInfo struct. hdfsFreeFileInfo should be + * called when the pointer is no longer needed. + * @param fs The configured filesystem handle. + * @param path The path of the file. + * @return Returns a dynamically-allocated hdfsFileInfo object; + * NULL on error. + */ + LIBHDFS_EXTERNAL + hdfsFileInfo *hdfsGetPathInfo(hdfsFS fs, const char* path); + + + /** + * hdfsFreeFileInfo - Free up the hdfsFileInfo array (including fields) + * @param hdfsFileInfo The array of dynamically-allocated hdfsFileInfo + * objects. + * @param numEntries The size of the array. + */ + LIBHDFS_EXTERNAL + void hdfsFreeFileInfo(hdfsFileInfo *hdfsFileInfo, int numEntries); + + /** + * hdfsFileIsEncrypted: determine if a file is encrypted based on its + * hdfsFileInfo. + * @return -1 if there was an error (errno will be set), 0 if the file is + * not encrypted, 1 if the file is encrypted. + */ + LIBHDFS_EXTERNAL + int hdfsFileIsEncrypted(hdfsFileInfo *hdfsFileInfo); + + + /** + * hdfsGetHosts - Get hostnames where a particular block (determined by + * pos & blocksize) of a file is stored. The last element in the array + * is NULL. Due to replication, a single block could be present on + * multiple hosts. + * @param fs The configured filesystem handle. + * @param path The path of the file. + * @param start The start of the block. + * @param length The length of the block. + * @return Returns a dynamically-allocated 2-d array of blocks-hosts; + * NULL on error. + */ + LIBHDFS_EXTERNAL + char*** hdfsGetHosts(hdfsFS fs, const char* path, + tOffset start, tOffset length); + + + /** + * hdfsFreeHosts - Free up the structure returned by hdfsGetHosts + * @param hdfsFileInfo The array of dynamically-allocated hdfsFileInfo + * objects. + * @param numEntries The size of the array. + */ + LIBHDFS_EXTERNAL + void hdfsFreeHosts(char ***blockHosts); + + + /** + * hdfsGetDefaultBlockSize - Get the default blocksize. + * + * @param fs The configured filesystem handle. + * @deprecated Use hdfsGetDefaultBlockSizeAtPath instead. + * + * @return Returns the default blocksize, or -1 on error. + */ + LIBHDFS_EXTERNAL + tOffset hdfsGetDefaultBlockSize(hdfsFS fs); + + + /** + * hdfsGetDefaultBlockSizeAtPath - Get the default blocksize at the + * filesystem indicated by a given path. + * + * @param fs The configured filesystem handle. + * @param path The given path will be used to locate the actual + * filesystem. The full path does not have to exist. + * + * @return Returns the default blocksize, or -1 on error. + */ + LIBHDFS_EXTERNAL + tOffset hdfsGetDefaultBlockSizeAtPath(hdfsFS fs, const char *path); + + + /** + * hdfsGetCapacity - Return the raw capacity of the filesystem. + * @param fs The configured filesystem handle. + * @return Returns the raw-capacity; -1 on error. + */ + LIBHDFS_EXTERNAL + tOffset hdfsGetCapacity(hdfsFS fs); + + + /** + * hdfsGetUsed - Return the total raw size of all files in the filesystem. + * @param fs The configured filesystem handle. + * @return Returns the total-size; -1 on error. + */ + LIBHDFS_EXTERNAL + tOffset hdfsGetUsed(hdfsFS fs); + + /** + * Change the user and/or group of a file or directory. + * + * @param fs The configured filesystem handle. + * @param path the path to the file or directory + * @param owner User string. Set to NULL for 'no change' + * @param group Group string. Set to NULL for 'no change' + * @return 0 on success else -1 + */ + LIBHDFS_EXTERNAL + int hdfsChown(hdfsFS fs, const char* path, const char *owner, + const char *group); + + /** + * hdfsChmod + * @param fs The configured filesystem handle. + * @param path the path to the file or directory + * @param mode the bitmask to set it to + * @return 0 on success else -1 + */ + LIBHDFS_EXTERNAL + int hdfsChmod(hdfsFS fs, const char* path, short mode); + + /** + * hdfsUtime + * @param fs The configured filesystem handle. + * @param path the path to the file or directory + * @param mtime new modification time or -1 for no change + * @param atime new access time or -1 for no change + * @return 0 on success else -1 + */ + LIBHDFS_EXTERNAL + int hdfsUtime(hdfsFS fs, const char* path, tTime mtime, tTime atime); + + /** + * Allocate a zero-copy options structure. + * + * You must free all options structures allocated with this function using + * hadoopRzOptionsFree. + * + * @return A zero-copy options structure, or NULL if one could + * not be allocated. If NULL is returned, errno will + * contain the error number. + */ + LIBHDFS_EXTERNAL + struct hadoopRzOptions *hadoopRzOptionsAlloc(void); + + /** + * Determine whether we should skip checksums in read0. + * + * @param opts The options structure. + * @param skip Nonzero to skip checksums sometimes; zero to always + * check them. + * + * @return 0 on success; -1 plus errno on failure. + */ + LIBHDFS_EXTERNAL + int hadoopRzOptionsSetSkipChecksum( + struct hadoopRzOptions *opts, int skip); + + /** + * Set the ByteBufferPool to use with read0. + * + * @param opts The options structure. + * @param className If this is NULL, we will not use any + * ByteBufferPool. If this is non-NULL, it will be + * treated as the name of the pool class to use. + * For example, you can use + * ELASTIC_BYTE_BUFFER_POOL_CLASS. + * + * @return 0 if the ByteBufferPool class was found and + * instantiated; + * -1 plus errno otherwise. + */ + LIBHDFS_EXTERNAL + int hadoopRzOptionsSetByteBufferPool( + struct hadoopRzOptions *opts, const char *className); + + /** + * Free a hadoopRzOptionsFree structure. + * + * @param opts The options structure to free. + * Any associated ByteBufferPool will also be freed. + */ + LIBHDFS_EXTERNAL + void hadoopRzOptionsFree(struct hadoopRzOptions *opts); + + /** + * Perform a byte buffer read. + * If possible, this will be a zero-copy (mmap) read. + * + * @param file The file to read from. + * @param opts An options structure created by hadoopRzOptionsAlloc. + * @param maxLength The maximum length to read. We may read fewer bytes + * than this length. + * + * @return On success, we will return a new hadoopRzBuffer. + * This buffer will continue to be valid and readable + * until it is released by readZeroBufferFree. Failure to + * release a buffer will lead to a memory leak. + * You can access the data within the hadoopRzBuffer with + * hadoopRzBufferGet. If you have reached EOF, the data + * within the hadoopRzBuffer will be NULL. You must still + * free hadoopRzBuffer instances containing NULL. + * + * On failure, we will return NULL plus an errno code. + * errno = EOPNOTSUPP indicates that we could not do a + * zero-copy read, and there was no ByteBufferPool + * supplied. + */ + LIBHDFS_EXTERNAL + struct hadoopRzBuffer* hadoopReadZero(hdfsFile file, + struct hadoopRzOptions *opts, int32_t maxLength); + + /** + * Determine the length of the buffer returned from readZero. + * + * @param buffer a buffer returned from readZero. + * @return the length of the buffer. + */ + LIBHDFS_EXTERNAL + int32_t hadoopRzBufferLength(const struct hadoopRzBuffer *buffer); + + /** + * Get a pointer to the raw buffer returned from readZero. + * + * To find out how many bytes this buffer contains, call + * hadoopRzBufferLength. + * + * @param buffer a buffer returned from readZero. + * @return a pointer to the start of the buffer. This will be + * NULL when end-of-file has been reached. + */ + LIBHDFS_EXTERNAL + const void *hadoopRzBufferGet(const struct hadoopRzBuffer *buffer); + + /** + * Release a buffer obtained through readZero. + * + * @param file The hdfs stream that created this buffer. This must be + * the same stream you called hadoopReadZero on. + * @param buffer The buffer to release. + */ + LIBHDFS_EXTERNAL + void hadoopRzBufferFree(hdfsFile file, struct hadoopRzBuffer *buffer); + +#ifdef __cplusplus +} +#endif + +#undef LIBHDFS_EXTERNAL +#endif /*LIBHDFS_HDFS_H*/ + +/** + * vim: ts=4: sw=4: et + */ diff --git a/dev/merge_arrow_pr.py b/dev/merge_arrow_pr.py index 981779ffb4c76..8f47f93b26dd1 100755 --- a/dev/merge_arrow_pr.py +++ b/dev/merge_arrow_pr.py @@ -173,7 +173,10 @@ def merge_pr(pr_num, target_ref): for c in commits: merge_message_flags += ["-m", c] - run_cmd(['git', 'commit', '--author="%s"' % primary_author] + merge_message_flags) + run_cmd(['git', 'commit', + '--no-verify', # do not run commit hooks + '--author="%s"' % primary_author] + + merge_message_flags) continue_maybe("Merge complete (local ref %s). Push to %s?" % ( target_branch_name, PUSH_REMOTE_NAME)) diff --git a/python/CMakeLists.txt b/python/CMakeLists.txt index f1becfcf44964..fdbfce99656ca 100644 --- a/python/CMakeLists.txt +++ b/python/CMakeLists.txt @@ -348,8 +348,10 @@ find_package(Arrow REQUIRED) include_directories(SYSTEM ${ARROW_INCLUDE_DIR}) ADD_THIRDPARTY_LIB(arrow SHARED_LIB ${ARROW_SHARED_LIB}) +ADD_THIRDPARTY_LIB(arrow_io + SHARED_LIB ${ARROW_IO_SHARED_LIB}) ADD_THIRDPARTY_LIB(arrow_parquet - SHARED_LIB ${ARROW_PARQUET_SHARED_LIB}) + SHARED_LIB ${ARROW_PARQUET_SHARED_LIB}) ############################################################ # Linker setup @@ -428,6 +430,7 @@ set(PYARROW_SRCS set(LINK_LIBS arrow + arrow_io arrow_parquet ) @@ -449,6 +452,7 @@ set(CYTHON_EXTENSIONS array config error + io parquet scalar schema diff --git a/python/cmake_modules/FindArrow.cmake b/python/cmake_modules/FindArrow.cmake index f0b258ed027b0..6bd305615fce2 100644 --- a/python/cmake_modules/FindArrow.cmake +++ b/python/cmake_modules/FindArrow.cmake @@ -47,13 +47,24 @@ find_library(ARROW_PARQUET_LIB_PATH NAMES arrow_parquet ${ARROW_SEARCH_LIB_PATH} NO_DEFAULT_PATH) +find_library(ARROW_IO_LIB_PATH NAMES arrow_io + PATHS + ${ARROW_SEARCH_LIB_PATH} + NO_DEFAULT_PATH) + if (ARROW_INCLUDE_DIR AND ARROW_LIB_PATH AND ARROW_PARQUET_LIB_PATH) set(ARROW_FOUND TRUE) set(ARROW_LIB_NAME libarrow) + set(ARROW_IO_LIB_NAME libarrow_io) set(ARROW_PARQUET_LIB_NAME libarrow_parquet) + set(ARROW_LIBS ${ARROW_SEARCH_LIB_PATH}) set(ARROW_STATIC_LIB ${ARROW_SEARCH_LIB_PATH}/${ARROW_LIB_NAME}.a) set(ARROW_SHARED_LIB ${ARROW_LIBS}/${ARROW_LIB_NAME}${CMAKE_SHARED_LIBRARY_SUFFIX}) + + set(ARROW_IO_STATIC_LIB ${ARROW_SEARCH_LIB_PATH}/${ARROW_IO_LIB_NAME}.a) + set(ARROW_IO_SHARED_LIB ${ARROW_LIBS}/${ARROW_IO_LIB_NAME}${CMAKE_SHARED_LIBRARY_SUFFIX}) + set(ARROW_PARQUET_STATIC_LIB ${ARROW_SEARCH_LIB_PATH}/${ARROW_PARQUET_LIB_NAME}.a) set(ARROW_PARQUET_SHARED_LIB ${ARROW_LIBS}/${ARROW_PARQUET_LIB_NAME}${CMAKE_SHARED_LIBRARY_SUFFIX}) else () @@ -62,7 +73,9 @@ endif () if (ARROW_FOUND) if (NOT Arrow_FIND_QUIETLY) - message(STATUS "Found the Arrow library: ${ARROW_LIB_PATH}, ${ARROW_PARQUET_LIB_PATH}") + message(STATUS "Found the Arrow core library: ${ARROW_LIB_PATH}") + message(STATUS "Found the Arrow IO library: ${ARROW_IO_LIB_PATH}") + message(STATUS "Found the Arrow Parquet library: ${ARROW_PARQUET_LIB_PATH}") endif () else () if (NOT Arrow_FIND_QUIETLY) @@ -82,6 +95,8 @@ mark_as_advanced( ARROW_LIBS ARROW_STATIC_LIB ARROW_SHARED_LIB + ARROW_IO_STATIC_LIB + ARROW_IO_SHARED_LIB ARROW_PARQUET_STATIC_LIB ARROW_PARQUET_SHARED_LIB ) diff --git a/python/conda.recipe/meta.yaml b/python/conda.recipe/meta.yaml index 85d24b6bc322e..98ae4141e3bd7 100644 --- a/python/conda.recipe/meta.yaml +++ b/python/conda.recipe/meta.yaml @@ -26,6 +26,7 @@ requirements: run: - arrow-cpp + - parquet-cpp - python - numpy - pandas diff --git a/python/pyarrow/error.pxd b/python/pyarrow/error.pxd index 97ba0ef2e9fcb..1fb6fad396a8b 100644 --- a/python/pyarrow/error.pxd +++ b/python/pyarrow/error.pxd @@ -18,5 +18,5 @@ from pyarrow.includes.libarrow cimport CStatus from pyarrow.includes.pyarrow cimport * -cdef check_cstatus(const CStatus& status) -cdef check_status(const Status& status) +cdef int check_cstatus(const CStatus& status) nogil except -1 +cdef int check_status(const Status& status) nogil except -1 diff --git a/python/pyarrow/error.pyx b/python/pyarrow/error.pyx index 5a6a038a92e43..244019321a7fd 100644 --- a/python/pyarrow/error.pyx +++ b/python/pyarrow/error.pyx @@ -22,16 +22,18 @@ from pyarrow.compat import frombytes class ArrowException(Exception): pass -cdef check_cstatus(const CStatus& status): +cdef int check_cstatus(const CStatus& status) nogil except -1: if status.ok(): - return + return 0 cdef c_string c_message = status.ToString() - raise ArrowException(frombytes(c_message)) + with gil: + raise ArrowException(frombytes(c_message)) -cdef check_status(const Status& status): +cdef int check_status(const Status& status) nogil except -1: if status.ok(): - return + return 0 cdef c_string c_message = status.ToString() - raise ArrowException(frombytes(c_message)) + with gil: + raise ArrowException(frombytes(c_message)) diff --git a/python/pyarrow/includes/common.pxd b/python/pyarrow/includes/common.pxd index 1f6ecee510521..133797bc37b5c 100644 --- a/python/pyarrow/includes/common.pxd +++ b/python/pyarrow/includes/common.pxd @@ -33,3 +33,21 @@ cdef extern from "": cdef extern from "": void Py_XDECREF(PyObject* o) +cdef extern from "arrow/api.h" namespace "arrow" nogil: + # We can later add more of the common status factory methods as needed + cdef CStatus CStatus_OK "Status::OK"() + + cdef cppclass CStatus "arrow::Status": + CStatus() + + c_string ToString() + + c_bool ok() + c_bool IsOutOfMemory() + c_bool IsKeyError() + c_bool IsNotImplemented() + c_bool IsInvalid() + + cdef cppclass Buffer: + uint8_t* data() + int64_t size() diff --git a/python/pyarrow/includes/libarrow.pxd b/python/pyarrow/includes/libarrow.pxd index 90414e3d542db..91ce069df8f42 100644 --- a/python/pyarrow/includes/libarrow.pxd +++ b/python/pyarrow/includes/libarrow.pxd @@ -19,25 +19,6 @@ from pyarrow.includes.common cimport * -cdef extern from "arrow/api.h" namespace "arrow" nogil: - # We can later add more of the common status factory methods as needed - cdef CStatus CStatus_OK "Status::OK"() - - cdef cppclass CStatus "arrow::Status": - CStatus() - - c_string ToString() - - c_bool ok() - c_bool IsOutOfMemory() - c_bool IsKeyError() - c_bool IsNotImplemented() - c_bool IsInvalid() - - cdef cppclass Buffer: - uint8_t* data() - int64_t size() - cdef extern from "arrow/api.h" namespace "arrow" nogil: enum Type" arrow::Type::type": diff --git a/python/pyarrow/includes/libarrow_io.pxd b/python/pyarrow/includes/libarrow_io.pxd new file mode 100644 index 0000000000000..d874ba3091237 --- /dev/null +++ b/python/pyarrow/includes/libarrow_io.pxd @@ -0,0 +1,93 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# distutils: language = c++ + +from pyarrow.includes.common cimport * + +cdef extern from "arrow/io/interfaces.h" nogil: + enum ObjectType" arrow::io::ObjectType::type": + ObjectType_FILE" arrow::io::ObjectType::FILE" + ObjectType_DIRECTORY" arrow::io::ObjectType::DIRECTORY" + +cdef extern from "arrow/io/hdfs.h" namespace "arrow::io" nogil: + CStatus ConnectLibHdfs() + + cdef cppclass HdfsConnectionConfig: + c_string host + int port + c_string user + + cdef cppclass HdfsPathInfo: + ObjectType kind; + c_string name + c_string owner + c_string group + int32_t last_modified_time + int32_t last_access_time + int64_t size + int16_t replication + int64_t block_size + int16_t permissions + + cdef cppclass CHdfsFile: + CStatus Close() + CStatus Seek(int64_t position) + CStatus Tell(int64_t* position) + + cdef cppclass HdfsReadableFile(CHdfsFile): + CStatus GetSize(int64_t* size) + CStatus Read(int32_t nbytes, int32_t* bytes_read, + uint8_t* buffer) + + CStatus ReadAt(int64_t position, int32_t nbytes, + int32_t* bytes_read, uint8_t* buffer) + + cdef cppclass HdfsWriteableFile(CHdfsFile): + CStatus Write(const uint8_t* buffer, int32_t nbytes) + + CStatus Write(const uint8_t* buffer, int32_t nbytes, + int32_t* bytes_written) + + cdef cppclass CHdfsClient" arrow::io::HdfsClient": + @staticmethod + CStatus Connect(const HdfsConnectionConfig* config, + shared_ptr[CHdfsClient]* client) + + CStatus CreateDirectory(const c_string& path) + + CStatus Delete(const c_string& path, c_bool recursive) + + CStatus Disconnect() + + c_bool Exists(const c_string& path) + + CStatus GetCapacity(int64_t* nbytes) + CStatus GetUsed(int64_t* nbytes) + + CStatus ListDirectory(const c_string& path, + vector[HdfsPathInfo]* listing) + + CStatus Rename(const c_string& src, const c_string& dst) + + CStatus OpenReadable(const c_string& path, + shared_ptr[HdfsReadableFile]* handle) + + CStatus OpenWriteable(const c_string& path, c_bool append, + int32_t buffer_size, int16_t replication, + int64_t default_block_size, + shared_ptr[HdfsWriteableFile]* handle) diff --git a/python/pyarrow/io.pyx b/python/pyarrow/io.pyx new file mode 100644 index 0000000000000..15c0f898ab21d --- /dev/null +++ b/python/pyarrow/io.pyx @@ -0,0 +1,503 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# Cython wrappers for IO interfaces defined in arrow/io + +# cython: profile=False +# distutils: language = c++ +# cython: embedsignature = True + +from libc.stdlib cimport malloc, free + +from pyarrow.includes.libarrow cimport * +cimport pyarrow.includes.pyarrow as pyarrow +from pyarrow.includes.libarrow_io cimport * + +from pyarrow.compat import frombytes, tobytes +from pyarrow.error cimport check_cstatus + +cimport cpython as cp + +import re +import threading + +_HDFS_PATH_RE = re.compile('hdfs://(.*):(\d+)(.*)') + +try: + # Python 3 + from queue import Queue, Empty as QueueEmpty, Full as QueueFull +except ImportError: + from Queue import Queue, Empty as QueueEmpty, Full as QueueFull + + +def have_libhdfs(): + try: + check_cstatus(ConnectLibHdfs()) + return True + except: + return False + + +def strip_hdfs_abspath(path): + m = _HDFS_PATH_RE.match(path) + if m: + return m.group(3) + else: + return path + + +cdef class HdfsClient: + cdef: + shared_ptr[CHdfsClient] client + + cdef readonly: + object host + int port + object user + bint is_open + + def __cinit__(self): + self.is_open = False + + def __dealloc__(self): + if self.is_open: + self.close() + + def close(self): + self._ensure_client() + with nogil: + check_cstatus(self.client.get().Disconnect()) + self.is_open = False + + cdef _ensure_client(self): + if self.client.get() == NULL: + raise IOError('HDFS client improperly initialized') + elif not self.is_open: + raise IOError('HDFS client is closed') + + @classmethod + def connect(cls, host, port, user): + """ + + Parameters + ---------- + host : + port : + user : + + Notes + ----- + The first time you call this method, it will take longer than usual due + to JNI spin-up time. + + Returns + ------- + client : HDFSClient + """ + cdef: + HdfsClient out = HdfsClient() + HdfsConnectionConfig conf + + conf.host = tobytes(host) + conf.port = port + conf.user = tobytes(user) + + with nogil: + check_cstatus( + CHdfsClient.Connect(&conf, &out.client)) + out.is_open = True + + return out + + def exists(self, path): + """ + Returns True if the path is known to the cluster, False if it does not + (or there is an RPC error) + """ + self._ensure_client() + + cdef c_string c_path = tobytes(path) + cdef c_bool result + with nogil: + result = self.client.get().Exists(c_path) + return result + + def ls(self, path, bint full_info=True): + """ + Retrieve directory contents and metadata, if requested. + + Parameters + ---------- + path : HDFS path + full_info : boolean, default True + If False, only return list of paths + + Returns + ------- + result : list of dicts (full_info=True) or strings (full_info=False) + """ + cdef: + c_string c_path = tobytes(path) + vector[HdfsPathInfo] listing + list results = [] + int i + + self._ensure_client() + + with nogil: + check_cstatus(self.client.get() + .ListDirectory(c_path, &listing)) + + cdef const HdfsPathInfo* info + for i in range(listing.size()): + info = &listing[i] + + # Try to trim off the hdfs://HOST:PORT piece + name = strip_hdfs_abspath(frombytes(info.name)) + + if full_info: + kind = ('file' if info.kind == ObjectType_FILE + else 'directory') + + results.append({ + 'kind': kind, + 'name': name, + 'owner': frombytes(info.owner), + 'group': frombytes(info.group), + 'list_modified_time': info.last_modified_time, + 'list_access_time': info.last_access_time, + 'size': info.size, + 'replication': info.replication, + 'block_size': info.block_size, + 'permissions': info.permissions + }) + else: + results.append(name) + + return results + + def mkdir(self, path): + """ + Create indicated directory and any necessary parent directories + """ + self._ensure_client() + + cdef c_string c_path = tobytes(path) + with nogil: + check_cstatus(self.client.get() + .CreateDirectory(c_path)) + + def delete(self, path, bint recursive=False): + """ + Delete the indicated file or directory + + Parameters + ---------- + path : string + recursive : boolean, default False + If True, also delete child paths for directories + """ + self._ensure_client() + + cdef c_string c_path = tobytes(path) + with nogil: + check_cstatus(self.client.get() + .Delete(c_path, recursive)) + + def open(self, path, mode='rb', buffer_size=None, replication=None, + default_block_size=None): + """ + Parameters + ---------- + mode : string, 'rb', 'wb', 'ab' + """ + self._ensure_client() + + cdef HdfsFile out = HdfsFile() + + if mode not in ('rb', 'wb', 'ab'): + raise Exception("Mode must be 'rb' (read), " + "'wb' (write, new file), or 'ab' (append)") + + cdef c_string c_path = tobytes(path) + cdef c_bool append = False + + # 0 in libhdfs means "use the default" + cdef int32_t c_buffer_size = buffer_size or 0 + cdef int16_t c_replication = replication or 0 + cdef int64_t c_default_block_size = default_block_size or 0 + + if mode in ('wb', 'ab'): + if mode == 'ab': + append = True + + with nogil: + check_cstatus( + self.client.get() + .OpenWriteable(c_path, append, c_buffer_size, + c_replication, c_default_block_size, + &out.wr_file)) + + out.is_readonly = False + else: + with nogil: + check_cstatus(self.client.get() + .OpenReadable(c_path, &out.rd_file)) + out.is_readonly = True + + if c_buffer_size == 0: + c_buffer_size = 2 ** 16 + + out.mode = mode + out.buffer_size = c_buffer_size + out.parent = self + out.is_open = True + + return out + + def upload(self, path, stream, buffer_size=2**16): + """ + Upload file-like object to HDFS path + """ + write_queue = Queue(50) + + f = self.open(path, 'wb') + + done = False + exception = None + def bg_write(): + try: + while not done or write_queue.qsize() > 0: + try: + buf = write_queue.get(timeout=0.01) + except QueueEmpty: + continue + + f.write(buf) + + except Exception as e: + exception = e + + writer_thread = threading.Thread(target=bg_write) + writer_thread.start() + + try: + while True: + buf = stream.read(buffer_size) + if not buf: + break + + write_queue.put_nowait(buf) + finally: + done = True + + writer_thread.join() + if exception is not None: + raise exception + + def download(self, path, stream, buffer_size=None): + f = self.open(path, 'rb', buffer_size=buffer_size) + f.download(stream) + + +cdef class HdfsFile: + cdef: + shared_ptr[HdfsReadableFile] rd_file + shared_ptr[HdfsWriteableFile] wr_file + bint is_readonly + bint is_open + object parent + + cdef readonly: + int32_t buffer_size + object mode + + def __cinit__(self): + self.is_open = False + + def __dealloc__(self): + if self.is_open: + self.close() + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_value, tb): + self.close() + + def close(self): + if self.is_open: + with nogil: + if self.is_readonly: + check_cstatus(self.rd_file.get().Close()) + else: + check_cstatus(self.wr_file.get().Close()) + self.is_open = False + + cdef _assert_readable(self): + if not self.is_readonly: + raise IOError("only valid on readonly files") + + cdef _assert_writeable(self): + if self.is_readonly: + raise IOError("only valid on writeonly files") + + def size(self): + cdef int64_t size + self._assert_readable() + with nogil: + check_cstatus(self.rd_file.get().GetSize(&size)) + return size + + def tell(self): + cdef int64_t position + with nogil: + if self.is_readonly: + check_cstatus(self.rd_file.get().Tell(&position)) + else: + check_cstatus(self.wr_file.get().Tell(&position)) + return position + + def seek(self, int64_t position): + self._assert_readable() + with nogil: + check_cstatus(self.rd_file.get().Seek(position)) + + def read(self, int nbytes): + """ + Read indicated number of bytes from the file, up to EOF + """ + cdef: + int32_t bytes_read = 0 + uint8_t* buf + + self._assert_readable() + + # This isn't ideal -- PyBytes_FromStringAndSize copies the data from + # the passed buffer, so it's hard for us to avoid doubling the memory + buf = malloc(nbytes) + if buf == NULL: + raise MemoryError("Failed to allocate {0} bytes".format(nbytes)) + + cdef int32_t total_bytes = 0 + + cdef int rpc_chunksize = min(self.buffer_size, nbytes) + + try: + with nogil: + while total_bytes < nbytes: + check_cstatus(self.rd_file.get() + .Read(rpc_chunksize, &bytes_read, + buf + total_bytes)) + + total_bytes += bytes_read + + # EOF + if bytes_read == 0: + break + result = cp.PyBytes_FromStringAndSize(buf, + total_bytes) + finally: + free(buf) + + return result + + def download(self, stream_or_path): + """ + Read file completely to local path (rather than reading completely into + memory). First seeks to the beginning of the file. + """ + cdef: + int32_t bytes_read = 0 + uint8_t* buf + self._assert_readable() + + write_queue = Queue(50) + + if not hasattr(stream_or_path, 'read'): + stream = open(stream_or_path, 'wb') + cleanup = lambda: stream.close() + else: + stream = stream_or_path + cleanup = lambda: None + + done = False + exception = None + def bg_write(): + try: + while not done or write_queue.qsize() > 0: + try: + buf = write_queue.get(timeout=0.01) + except QueueEmpty: + continue + stream.write(buf) + except Exception as e: + exception = e + finally: + cleanup() + + self.seek(0) + + writer_thread = threading.Thread(target=bg_write) + + # This isn't ideal -- PyBytes_FromStringAndSize copies the data from + # the passed buffer, so it's hard for us to avoid doubling the memory + buf = malloc(self.buffer_size) + if buf == NULL: + raise MemoryError("Failed to allocate {0} bytes" + .format(self.buffer_size)) + + writer_thread.start() + + cdef int64_t total_bytes = 0 + + try: + while True: + with nogil: + check_cstatus(self.rd_file.get() + .Read(self.buffer_size, &bytes_read, buf)) + + total_bytes += bytes_read + + # EOF + if bytes_read == 0: + break + + pybuf = cp.PyBytes_FromStringAndSize(buf, + bytes_read) + + write_queue.put_nowait(pybuf) + finally: + free(buf) + done = True + + writer_thread.join() + if exception is not None: + raise exception + + def write(self, data): + """ + Write bytes-like (unicode, encoded to UTF-8) to file + """ + self._assert_writeable() + + data = tobytes(data) + + cdef const uint8_t* buf = cp.PyBytes_AS_STRING(data) + cdef int32_t bufsize = len(data) + with nogil: + check_cstatus(self.wr_file.get().Write(buf, bufsize)) diff --git a/python/pyarrow/tests/test_array.py b/python/pyarrow/tests/test_array.py index bf5a22089cdba..86147f8df5a11 100644 --- a/python/pyarrow/tests/test_array.py +++ b/python/pyarrow/tests/test_array.py @@ -15,25 +15,24 @@ # specific language governing permissions and limitations # under the License. -from pyarrow.compat import unittest import pyarrow import pyarrow.formatting as fmt -class TestArrayAPI(unittest.TestCase): +def test_repr_on_pre_init_array(): + arr = pyarrow.array.Array() + assert len(repr(arr)) > 0 - def test_repr_on_pre_init_array(self): - arr = pyarrow.array.Array() - assert len(repr(arr)) > 0 - def test_getitem_NA(self): - arr = pyarrow.from_pylist([1, None, 2]) - assert arr[1] is pyarrow.NA +def test_getitem_NA(): + arr = pyarrow.from_pylist([1, None, 2]) + assert arr[1] is pyarrow.NA - def test_list_format(self): - arr = pyarrow.from_pylist([[1], None, [2, 3, None]]) - result = fmt.array_format(arr) - expected = """\ + +def test_list_format(): + arr = pyarrow.from_pylist([[1], None, [2, 3, None]]) + result = fmt.array_format(arr) + expected = """\ [ [1], NA, @@ -41,23 +40,25 @@ def test_list_format(self): 3, NA] ]""" - assert result == expected + assert result == expected + - def test_string_format(self): - arr = pyarrow.from_pylist(['', None, 'foo']) - result = fmt.array_format(arr) - expected = """\ +def test_string_format(): + arr = pyarrow.from_pylist(['', None, 'foo']) + result = fmt.array_format(arr) + expected = """\ [ '', NA, 'foo' ]""" - assert result == expected + assert result == expected + - def test_long_array_format(self): - arr = pyarrow.from_pylist(range(100)) - result = fmt.array_format(arr, window=2) - expected = """\ +def test_long_array_format(): + arr = pyarrow.from_pylist(range(100)) + result = fmt.array_format(arr, window=2) + expected = """\ [ 0, 1, @@ -65,4 +66,4 @@ def test_long_array_format(self): 98, 99 ]""" - assert result == expected + assert result == expected diff --git a/python/pyarrow/tests/test_io.py b/python/pyarrow/tests/test_io.py new file mode 100644 index 0000000000000..328e923b941a4 --- /dev/null +++ b/python/pyarrow/tests/test_io.py @@ -0,0 +1,126 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from io import BytesIO +from os.path import join as pjoin +import os +import random + +import pytest + +import pyarrow.io as io + +#---------------------------------------------------------------------- +# HDFS tests + + +def hdfs_test_client(): + host = os.environ.get('ARROW_HDFS_TEST_HOST', 'localhost') + user = os.environ['ARROW_HDFS_TEST_USER'] + try: + port = int(os.environ.get('ARROW_HDFS_TEST_PORT', 20500)) + except ValueError: + raise ValueError('Env variable ARROW_HDFS_TEST_PORT was not ' + 'an integer') + + return io.HdfsClient.connect(host, port, user) + + +libhdfs = pytest.mark.skipif(not io.have_libhdfs(), + reason='No libhdfs available on system') + + +HDFS_TMP_PATH = '/tmp/pyarrow-test-{0}'.format(random.randint(0, 1000)) + +@pytest.fixture(scope='session') +def hdfs(request): + fixture = hdfs_test_client() + def teardown(): + fixture.delete(HDFS_TMP_PATH, recursive=True) + fixture.close() + request.addfinalizer(teardown) + return fixture + + +@libhdfs +def test_hdfs_close(): + client = hdfs_test_client() + assert client.is_open + client.close() + assert not client.is_open + + with pytest.raises(Exception): + client.ls('/') + + +@libhdfs +def test_hdfs_mkdir(hdfs): + path = pjoin(HDFS_TMP_PATH, 'test-dir/test-dir') + parent_path = pjoin(HDFS_TMP_PATH, 'test-dir') + + hdfs.mkdir(path) + assert hdfs.exists(path) + + hdfs.delete(parent_path, recursive=True) + assert not hdfs.exists(path) + + +@libhdfs +def test_hdfs_ls(hdfs): + base_path = pjoin(HDFS_TMP_PATH, 'ls-test') + hdfs.mkdir(base_path) + + dir_path = pjoin(base_path, 'a-dir') + f1_path = pjoin(base_path, 'a-file-1') + + hdfs.mkdir(dir_path) + + f = hdfs.open(f1_path, 'wb') + f.write('a' * 10) + + contents = sorted(hdfs.ls(base_path, False)) + assert contents == [dir_path, f1_path] + + +@libhdfs +def test_hdfs_download_upload(hdfs): + base_path = pjoin(HDFS_TMP_PATH, 'upload-test') + + data = b'foobarbaz' + buf = BytesIO(data) + buf.seek(0) + + hdfs.upload(base_path, buf) + + out_buf = BytesIO() + hdfs.download(base_path, out_buf) + out_buf.seek(0) + assert out_buf.getvalue() == data + + +@libhdfs +def test_hdfs_file_context_manager(hdfs): + path = pjoin(HDFS_TMP_PATH, 'ctx-manager') + + data = b'foo' + with hdfs.open(path, 'wb') as f: + f.write(data) + + with hdfs.open(path, 'rb') as f: + assert f.size() == 3 + result = f.read(10) + assert result == data diff --git a/python/setup.py b/python/setup.py index 7edeb9143319b..59410d75a61e2 100644 --- a/python/setup.py +++ b/python/setup.py @@ -214,7 +214,14 @@ def get_ext_built(self, name): return name + suffix def get_cmake_cython_names(self): - return ['array', 'config', 'error', 'parquet', 'scalar', 'schema', 'table'] + return ['array', + 'config', + 'error', + 'io', + 'parquet', + 'scalar', + 'schema', + 'table'] def get_names(self): return self._found_names