Skip to content

Commit

Permalink
Working arrow tests
Browse files Browse the repository at this point in the history
  • Loading branch information
willdealtry committed Sep 12, 2024
1 parent 52cdc81 commit 9483af3
Show file tree
Hide file tree
Showing 14 changed files with 161 additions and 18 deletions.
6 changes: 6 additions & 0 deletions .gitmodules
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,9 @@
[submodule "cpp/vcpkg"]
path = cpp/vcpkg
url = https://github.com/microsoft/vcpkg.git
[submodule "cpp/third_party/sparrow"]
path = cpp/third_party/sparrow
url = https://github.com/man-group/sparrow.git
[submodule "cpp/third_party/date"]
path = cpp/third_party/date
url = https://github.com/HowardHinnant/date.git
10 changes: 9 additions & 1 deletion cpp/arcticdb/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ find_package(zstd CONFIG REQUIRED) # "CONFIG" bypasses our cpp/CMake/FindZstd.c

find_package(azure-identity-cpp CONFIG REQUIRED)
find_package(azure-storage-blobs-cpp CONFIG REQUIRED)
find_package(sparrow REQUIRED)

if(${BUILD_WITH_REMOTERY})
add_compile_definitions(USE_REMOTERY)
Expand Down Expand Up @@ -503,7 +504,7 @@ set(arcticdb_srcs
version/symbol_list.cpp
version/version_map_batch_methods.cpp
storage/s3/ec2_utils.cpp
util/buffer_holder.cpp)
util/buffer_holder.cpp arrow/arrow_utils.hpp)

add_library(arcticdb_core_object OBJECT ${arcticdb_srcs})

Expand Down Expand Up @@ -728,19 +729,22 @@ if (SSL_LINK)
endif ()
target_link_libraries(arcticdb_core_object PUBLIC ${arcticdb_core_libraries})

message(STATUS "Main SPARROW_INCLUDE_DIR: ${SPARROW_INCLUDE_DIR}")

target_include_directories(arcticdb_core_object
PRIVATE
$<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}>
$<BUILD_INTERFACE:${CMAKE_CURRENT_BINARY_DIR}>
${arcticdb_core_includes}
${SPARROW_INCLUDE_DIR}
)

add_library(arcticdb_core_static STATIC $<TARGET_OBJECTS:arcticdb_core_object>)

target_include_directories(arcticdb_core_static PUBLIC
$<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}>
$<BUILD_INTERFACE:${CMAKE_CURRENT_BINARY_DIR}>
${SPARROW_INCLUDE_DIR}
)

target_link_libraries(arcticdb_core_static PUBLIC ${arcticdb_core_libraries})
Expand Down Expand Up @@ -793,6 +797,7 @@ target_include_directories(arcticdb_python PRIVATE
${LIBBSONCXX_STATIC_INCLUDE_DIRS}
${BITMAGIC_INCLUDE_DIRS}
${LMDB_LIBRARIES}
${SPARROW_INCLUDE_DIR}
)


Expand Down Expand Up @@ -842,6 +847,7 @@ target_include_directories(arcticdb_ext
${BITMAGIC_INCLUDE_DIRS}
${PYTHON_NUMPY_INCLUDE_DIR}
${AWSSDK_INCLUDE_DIRS}
${SPARROW_INCLUDE_DIR}
)

install(TARGETS arcticdb_ext
Expand Down Expand Up @@ -873,6 +879,7 @@ if(${TEST})
python_utils_dump_vars_if_enabled("Python for test compilation")

set(unit_test_srcs
arrow/test/test_arrow.cpp
async/test/test_async.cpp
codec/test/test_codec.cpp
codec/test/test_encode_field_collection.cpp
Expand Down Expand Up @@ -1017,6 +1024,7 @@ if(${TEST})
${LIBMONGOCXX_STATIC_INCLUDE_DIRS}
${LIBBSONCXX_STATIC_INCLUDE_DIRS}
${BITMAGIC_INCLUDE_DIRS}
${SPARROW_INCLUDE_DIR}
)
endif()

Expand Down
53 changes: 53 additions & 0 deletions cpp/arcticdb/arrow/arrow_utils.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/* Copyright 2023 Man Group Operations Limited
*
* Use of this software is governed by the Business Source License 1.1 included in the file licenses/BSL.txt.
*
* As of the Change Date specified in that file, in accordance with the Business Source License, use of this software will be governed by the Apache License, version 2.0.
*/

#pragma once

#include <sparrow/array/array_data.hpp>
#include <sparrow/array.hpp>
#include <sparrow/arrow_interface/array_data_to_arrow_array_converters.hpp>
#include <sparrow/arrow_interface/arrow_array/smart_pointers.hpp>
#include <sparrow/external_array.hpp>

#include <arcticdb/column_store/memory_segment.hpp>


namespace arcticdb {

sparrow::arrow_array_unique_ptr arrow_data_from_column(const Column& column) {
return column.type().visit_tag([&](auto && impl) -> sparrow::arrow_array_unique_ptr {
using TagType = std::decay_t<decltype(impl)>;
using DataType = TagType::DataTypeTag;
using RawType = DataType::raw_type;
if constexpr (!is_sequence_type(DataType::data_type)) {
sparrow::array_data data;
data.type = sparrow::data_descriptor(sparrow::arrow_traits<RawType>::type_id);
auto column_data = column.data();
util::check(column_data.num_blocks() == 1, "Expected single block in arrow conversion");
auto block = column_data.next<TagType>().value();
const auto row_count = block.row_count();
sparrow::buffer<RawType> buffer(const_cast<RawType *>(block.release()), row_count);
data.buffers.push_back(buffer);
data.length = static_cast<std::int64_t>(row_count);
data.offset = static_cast<std::int64_t>(0);
return to_arrow_array_unique_ptr(std::move(data));
} else {
util::raise_rte("Sequence types not implemented");
}
});
};

std::vector<sparrow::arrow_array_unique_ptr> segment_to_arrow_arrays(SegmentInMemory& segment) {
std::vector<sparrow::arrow_array_unique_ptr> output;
for(auto& column : segment.columns()) {
output.emplace_back(arrow_data_from_column(*column));
}
return output;
}


} // namespace arcticdb
44 changes: 44 additions & 0 deletions cpp/arcticdb/arrow/test/test_arrow.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/* Copyright 2023 Man Group Operations Limited
*
* Use of this software is governed by the Business Source License 1.1 included in the file licenses/BSL.txt.
*
* As of the Change Date specified in that file, in accordance with the Business Source License, use of this software will be governed by the Apache License, version 2.0.
*/

#include <gtest/gtest.h>

#include <arcticdb/util/test/generators.hpp>
#include <arcticdb/arrow/arrow_utils.hpp>

TEST(Arrow, ConvertColumn) {
using namespace arcticdb;
using TDT = TypeDescriptorTag<DataTypeTag<DataType::UINT16>, DimensionTag<Dimension ::Dim0>>;
Column column(static_cast<TypeDescriptor>(TDT{}), 0, AllocationType::DETACHABLE, Sparsity::NOT_PERMITTED);
for(auto i= 0; i < 10; ++i) {
column.set_scalar<uint16_t>(i, i);
}

auto data = arrow_data_from_column(column);
ASSERT_EQ(data->n_buffers, 2);
ASSERT_EQ(data->offset, 0);
ASSERT_EQ(data->n_children, 0);
}

TEST(Arrow, ConvertSegment) {
using namespace arcticdb;
auto desc = stream_descriptor(StreamId{"id"}, stream::RowCountIndex{}, {
scalar_field(DataType::UINT8, "uint8"),
scalar_field(DataType::UINT32, "uint32")});

SegmentInMemory segment(desc, 20, AllocationType::DETACHABLE, Sparsity::NOT_PERMITTED, DataTypeMode::INTERNAL);

auto col1_ptr = segment.column(0).buffer().data();
auto col2_ptr = reinterpret_cast<uint32_t*>(segment.column(1).buffer().data());
for(auto j = 0; j < 20; ++j ) {
*col1_ptr++ = j;
*col2_ptr++ = j * 2;
}

auto vec = segment_to_arrow_arrays(segment);
ASSERT_EQ(vec.size(), 2);
}
28 changes: 18 additions & 10 deletions cpp/arcticdb/column_store/chunked_buffer.hpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@


/* Copyright 2023 Man Group Operations Limited
*
* Use of this software is governed by the Business Source License 1.1 included in the file licenses/BSL.txt.
Expand All @@ -22,14 +23,14 @@
namespace arcticdb {

/*
* ChunkedBufferImpl is an untyped buffer that is composed of blocks of data that can be either regularly or
* ChunkedBuffer is an untyped buffer that is composed of blocks of data that can be either regularly or
* irregularly sized, with optimizations for the following representations:
*
* - a single block
* - multiple blocks that are all regularly sized
* - multiple blocks that are regularly sized up until a point, and irregularly sized after that point
*
* No optimization is performed when the blocks are all irregularly sized.
* Lookup is log(n) where n is the number of blocks when the blocks are all irregularly sized.
*
* This class can be wrapped in a cursor for the purposes of linear reads and writes (see CursoredBuffer),
* and subsequently detached if required.
Expand Down Expand Up @@ -95,9 +96,11 @@ class ChunkedBufferImpl {
reserve(size);
}

ChunkedBufferImpl(size_t size, entity::AllocationType allocation_type) {
ChunkedBufferImpl(size_t size, entity::AllocationType allocation_type) :
allocation_type_(allocation_type) {
if(allocation_type == entity::AllocationType::DETACHABLE) {
add_detachable_block(size);
bytes_ = size;
} else {
reserve(size);
}
Expand Down Expand Up @@ -171,6 +174,7 @@ class ChunkedBufferImpl {
swap(left.regular_sized_until_, right.regular_sized_until_);
swap(left.blocks_, right.blocks_);
swap(left.block_offsets_, right.block_offsets_);
swap(left.allocation_type_, right.allocation_type_);
}

[[nodiscard]] const auto &blocks() const { return blocks_; }
Expand All @@ -192,8 +196,10 @@ class ChunkedBufferImpl {
res = last_block().end();
last_block().bytes_ += extra_size;
} else {
// Still regular-sized, add a new block
if (is_regular_sized()) {
if(allocation_type_ == entity::AllocationType::DETACHABLE) {
util::check(blocks_.empty(), "Multiple detachable blocks not supported");
add_detachable_block(std::max(requested_size, DefaultBlockSize));
} else if (is_regular_sized()) {
auto space = free_space();
if (extra_size <= DefaultBlockSize && (space == 0 || aligned)) {
if (aligned && space > 0) {
Expand Down Expand Up @@ -378,18 +384,19 @@ class ChunkedBufferImpl {
auto [ptr, ts] = Allocator::aligned_alloc(sizeof(MemBlock));
new(ptr) MemBlock(data, size, offset, ts, false);
blocks_.emplace_back(reinterpret_cast<BlockType*>(ptr));
bytes_ += size;
}

void add_detachable_block(size_t size) {
void add_detachable_block(size_t capacity) {
if(capacity == 0)
return;

if (!no_blocks() && last_block().empty())
free_last_block();

auto [ptr, ts] = Allocator::aligned_alloc(sizeof(MemBlock));
auto* data = reinterpret_cast<uint8_t*>(malloc(size));
new(ptr) MemBlock(data, size, 0UL, ts, true);
auto* data = reinterpret_cast<uint8_t*>(malloc(capacity));
new(ptr) MemBlock(data, capacity, 0UL, ts, true);
blocks_.emplace_back(reinterpret_cast<BlockType*>(ptr));
bytes_ += size;
}

[[nodiscard]] bool empty() const { return bytes_ == 0; }
Expand Down Expand Up @@ -476,6 +483,7 @@ class ChunkedBufferImpl {
#else
std::vector<BlockType*> blocks_;
std::vector<size_t> block_offsets_;
entity::AllocationType allocation_type_ = entity::AllocationType::DYNAMIC;
#endif
};

Expand Down
4 changes: 4 additions & 0 deletions cpp/arcticdb/column_store/column_data.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,10 @@ struct TypedBlockData {
return nbytes_;
}

[[nodiscard]] const raw_type* release() {
return reinterpret_cast<const raw_type*>(const_cast<MemBlock*>(block_)->release());
}

[[nodiscard]] std::size_t row_count() const {
return row_count_;
}
Expand Down
2 changes: 1 addition & 1 deletion cpp/arcticdb/entity/output_type.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
#include <cstdint>

namespace arcticdb {
enum class OutputType : uint8_t {
enum class OutputFormat : uint8_t {
PANDAS,
ARROW,
PARQUET
Expand Down
6 changes: 3 additions & 3 deletions cpp/arcticdb/pipeline/read_options.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ struct ReadOptions {
std::optional<bool> set_tz_;
std::optional<bool> optimise_string_memory_;
std::optional<bool> batch_throw_on_error_;
OutputType output_type_ = OutputType::PANDAS;
OutputFormat output_format_ = OutputFormat::PANDAS;

void set_force_strings_to_fixed(const std::optional<bool>& force_strings_to_fixed) {
force_strings_to_fixed_ = force_strings_to_fixed;
Expand Down Expand Up @@ -60,8 +60,8 @@ struct ReadOptions {
batch_throw_on_error_ = batch_throw_on_error;
}

void set_output_type(OutputType output_type) {
output_type_ = output_type;
void set_output_format(OutputFormat output_format) {
output_format_ = output_format;
}
};
} //namespace arcticdb
9 changes: 7 additions & 2 deletions cpp/arcticdb/version/python_bindings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -159,15 +159,19 @@ void register_bindings(py::module &version, py::exception<arcticdb::ArcticExcept
}))
.def(py::init([](py::array value_list){
return std::make_shared<ValueSet>(value_list);
}))
;
}));

py::class_<VersionQuery>(version, "PythonVersionStoreVersionQuery")
.def(py::init())
.def("set_snap_name", &VersionQuery::set_snap_name)
.def("set_timestamp", &VersionQuery::set_timestamp)
.def("set_version", &VersionQuery::set_version);

py::enum_<OutputFormat>(version, "OutputFormat")
.value("PANDAS", OutputFormat::PANDAS)
.value("ARROW", OutputFormat::ARROW)
.value("PARQUET", OutputFormat::PARQUET);

py::class_<ReadOptions>(version, "PythonVersionStoreReadOptions")
.def(py::init())
.def("set_force_strings_to_object", &ReadOptions::set_force_strings_to_object)
Expand All @@ -177,6 +181,7 @@ void register_bindings(py::module &version, py::exception<arcticdb::ArcticExcept
.def("set_set_tz", &ReadOptions::set_set_tz)
.def("set_optimise_string_memory", &ReadOptions::set_optimise_string_memory)
.def("set_batch_throw_on_error", &ReadOptions::set_batch_throw_on_error)
.def("set_output_format", &ReadOptions::set_output_format)
.def_property_readonly("incompletes", &ReadOptions::get_incompletes);

version.def("write_dataframe_to_file", &write_dataframe_to_file);
Expand Down
2 changes: 1 addition & 1 deletion cpp/arcticdb/version/test/test_version_map.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -903,7 +903,7 @@ TEST(VersionMap, CompactionUpdateCache) {
version_map->write_version(store, key, std::nullopt);
}

auto assert_keys_in_entry_and_store = [&store, &id](std::shared_ptr<VersionMapEntry> entry, int expected_version_keys, int expected_index_keys, int expected_tombstone_keys){
auto assert_keys_in_entry_and_store = [&store](std::shared_ptr<VersionMapEntry> entry, int expected_version_keys, int expected_index_keys, int expected_tombstone_keys){
int present_version_keys = 0, present_index_keys = 0, present_tombstone_keys = 0;
auto all_entry_keys = entry->keys_;
if (entry->head_) all_entry_keys.push_back(entry->head_.value());
Expand Down
1 change: 1 addition & 0 deletions cpp/third_party/date
Submodule date added at 1ead67
1 change: 1 addition & 0 deletions cpp/third_party/sparrow
Submodule sparrow added at 6b0cff
2 changes: 2 additions & 0 deletions python/arcticdb/version_store/_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
from arcticdb_ext.version_store import StreamDescriptorMismatch
from arcticdb_ext.version_store import DataError
from arcticdb_ext.version_store import sorted_value_name
from arcticdb_ext.version_store import OutputFormat
from arcticdb.authorization.permissions import OpenMode
from arcticdb.exceptions import ArcticDbNotYetImplemented, ArcticNativeException
from arcticdb.flattener import Flattener
Expand Down Expand Up @@ -1624,6 +1625,7 @@ def _get_read_options(self, **kwargs):
read_options = _PythonVersionStoreReadOptions()
read_options.set_force_strings_to_object(_assume_false("force_string_to_object", kwargs))
read_options.set_optimise_string_memory(_assume_false("optimise_string_memory", kwargs))
read_options.set_output_format(kwargs.get("output_format"), default=OutputFormat.PANDAS)
read_options.set_dynamic_schema(
self.resolve_defaults("dynamic_schema", proto_cfg, global_default=False, **kwargs)
)
Expand Down
11 changes: 11 additions & 0 deletions python/tests/unit/arcticdb/version_store/test_arrow.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
from arcticdb_ext.version_store import OutputFormat
import pandas as pd
import numpy as np


def test_basic_roundtrip(lmdb_version_store_v1):
lib = lmdb_version_store_v1
df = pd.DataFrame({"x": np.arange(10)})
lib.write("arrow", df)
vit = lib.read("arrow", output_format=OutputFormat.ARROW)
print(vit)

0 comments on commit 9483af3

Please sign in to comment.