Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(cpp-client): Honor Arrow TimeUnit when converting Timestamp and Time64 to ColumnSource #6609

Merged
merged 1 commit into from
Feb 8, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions cpp-client/deephaven/dhclient/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ set(ALL_FILES
include/private/deephaven/client/impl/util.h

src/arrowutil/arrow_client_table.cc
src/arrowutil/arrow_column_source.cc
include/private/deephaven/client/arrowutil/arrow_client_table.h
include/private/deephaven/client/arrowutil/arrow_column_source.h
include/private/deephaven/client/arrowutil/arrow_value_converter.h
Expand Down Expand Up @@ -107,6 +108,7 @@ set(ALL_FILES
src/utility/table_maker.cc

include/public/deephaven/client/utility/arrow_util.h
include/public/deephaven/client/utility/internal_types.h
include/public/deephaven/client/utility/misc_types.h
include/public/deephaven/client/utility/table_maker.h
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,22 @@ namespace internal {
// null-ness by determining whether the optional has a value.
// kTimestamp is its own special case, where nullness is determined by the underlying nanos
// being equal to Deephaven's NULL_LONG.
// kLocalDate and kLocalTime are like kTimestamp except they resolve to different data types.
// kLocalDate and kLocalTime are similar to kTimestamp in nullness except they resolve to different
// data types.
enum class ArrowProcessingStyle { kNormal, kBooleanOrString, kTimestamp, kLocalDate, kLocalTime };

/**
* When 'array' has dynamic type arrow::TimestampArray or arrow::Time64Array, look at the
* underlying time resolution of the arrow type and calculate a conversion factor from that unit
* to nanoseconds. For example if the underlying time unit is arrow::TimeUnit::MILLI, then the
* conversion factor would be 1_000_000, meaning that one needs to multiply incoming millisecond
* values by one million to convert them to nanoseconds. If 'array' is not one of those types,
* return 1.
* @param array The Arrow array
* @return For supported time types, the conversion factor to nanoseconds. Otherwise, 1.
*/
size_t CalcTimeNanoScaleFactor(const arrow::Array &array);

template<ArrowProcessingStyle Style, typename TColumnSourceBase, typename TArrowArray, typename TChunk>
class GenericArrowColumnSource final : public TColumnSourceBase {
using BooleanChunk = deephaven::dhcore::chunk::BooleanChunk;
Expand All @@ -37,7 +50,8 @@ class GenericArrowColumnSource final : public TColumnSourceBase {
using UInt64Chunk = deephaven::dhcore::chunk::UInt64Chunk;

public:
static std::shared_ptr<GenericArrowColumnSource> OfArrowArray(std::shared_ptr<TArrowArray> array) {
static std::shared_ptr<GenericArrowColumnSource>
OfArrowArray(std::shared_ptr<TArrowArray> array) {
std::vector<std::shared_ptr<TArrowArray>> arrays{std::move(array)};
return OfArrowArrayVec(std::move(arrays));
}
Expand All @@ -48,7 +62,9 @@ class GenericArrowColumnSource final : public TColumnSourceBase {
}

explicit GenericArrowColumnSource(std::vector<std::shared_ptr<TArrowArray>> arrays) :
arrays_(std::move(arrays)) {}
arrays_(std::move(arrays)) {
time_nano_scale_factor_ = arrays_.empty() ? 1 : CalcTimeNanoScaleFactor(*arrays_.front());
}

~GenericArrowColumnSource() final = default;

Expand All @@ -67,13 +83,14 @@ class GenericArrowColumnSource final : public TColumnSourceBase {

// This algorithm is a little tricky because the source data and RowSequence are both
// segmented, perhaps in different ways.
auto *typed_dest = VerboseCast<TChunk*>(DEEPHAVEN_LOCATION_EXPR(dest_data));
auto *typed_dest = VerboseCast<TChunk *>(DEEPHAVEN_LOCATION_EXPR(dest_data));
auto *destp = typed_dest->data();
auto outerp = arrays_.begin();
size_t src_segment_begin = 0;
size_t src_segment_end = (*outerp)->length();

auto *null_destp = optional_dest_null_flags != nullptr ? optional_dest_null_flags->data() : nullptr;
auto *null_destp =
optional_dest_null_flags != nullptr ? optional_dest_null_flags->data() : nullptr;

rows.ForEachInterval([&](uint64_t requested_segment_begin, uint64_t requested_segment_end) {
while (true) {
Expand Down Expand Up @@ -147,11 +164,12 @@ class GenericArrowColumnSource final : public TColumnSourceBase {
const auto *src_endp = innerp->raw_values() + relative_end;

for (const auto *ip = src_beginp; ip != src_endp; ++ip) {
*destp = DateTime::FromNanos(*ip);
auto is_null = *ip == DeephavenTraits<int64_t>::kNullValue;
*destp = DateTime::FromNanos(is_null ? *ip : (*ip * time_nano_scale_factor_));
++destp;

if (null_destp != nullptr) {
*null_destp = *ip == DeephavenTraits<int64_t>::kNullValue;
*null_destp = is_null;
++null_destp;
}
}
Expand All @@ -175,11 +193,12 @@ class GenericArrowColumnSource final : public TColumnSourceBase {
const auto *src_endp = innerp->raw_values() + relative_end;

for (const auto *ip = src_beginp; ip != src_endp; ++ip) {
*destp = LocalTime::FromNanos(*ip);
auto is_null = *ip == DeephavenTraits<int64_t>::kNullValue;
*destp = LocalTime::FromNanos(is_null ? *ip : (*ip * time_nano_scale_factor_));
++destp;

if (null_destp != nullptr) {
*null_destp = *ip == DeephavenTraits<int64_t>::kNullValue;
*null_destp = is_null;
++null_destp;
}
}
Expand All @@ -200,6 +219,18 @@ class GenericArrowColumnSource final : public TColumnSourceBase {

private:
std::vector<std::shared_ptr<TArrowArray>> arrays_;
/**
* This value is valid for Style == ArrowProcessingStyle::kTimestamp and
* ArrowProcessingStyle::kLocalTime, and ignored for other ArrowProcessingStyle enumeration
* values.
*
* These ArrowProcessingStyles come into play when processing the arrow types
* arrow::TimestampType and arrow::Time64Type respectively.
*
* The value stores a conversion factor from whatever the input scale is to nanoseconds.
* For example, if the input timescale is milliseconds, this value will be 1_000_000.
*/
size_t time_nano_scale_factor_ = 1;
};
} // namespace internal

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending
*/
#pragma once

#include <arrow/flight/types.h>

namespace deephaven::client::utility {
/**
* For Deephaven use only
*/
namespace internal {
/**
* This class exists only for the benefit of the unit tests. Our normal DateTime class has a
* native time resolution of nanoseconds. This class allows our unit tests to upload a
* DateTime having a different time unit so we can confirm that the client and server both handle
* it correctly.
*/
template<arrow::TimeUnit::type UNIT>
struct InternalDateTime {
explicit InternalDateTime(int64_t value) : value_(value) {}

int64_t value_ = 0;
};

/**
* This class exists only for the benefit of the unit tests. Our normal LocalTime class has a
* native time resolution of nanoseconds. This class allows our unit tests to upload a
* LocalTime having a different time unit so we can confirm that the client and server both handle
* it correctly.
*/
template<arrow::TimeUnit::type UNIT>
struct InternalLocalTime {
// Arrow Time64 only supports micro and nano units
static_assert(UNIT == arrow::TimeUnit::MICRO || UNIT == arrow::TimeUnit::NANO);

explicit InternalLocalTime(int64_t value) : value_(value) {}

int64_t value_ = 0;
};
} // namespace internal
} // namespace deephaven::client::utility
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@

#include "deephaven/client/client.h"
#include "deephaven/client/utility/arrow_util.h"
#include "deephaven/client/utility/internal_types.h"
#include "deephaven/client/utility/misc_types.h"
#include "deephaven/dhcore/types.h"
#include "deephaven/dhcore/utility/utility.h"
#include "deephaven/third_party/fmt/format.h"

Expand Down Expand Up @@ -356,6 +359,38 @@ struct TypeConverterTraits<std::optional<T>> {
}
};

template<arrow::TimeUnit::type UNIT>
struct TypeConverterTraits<deephaven::client::utility::internal::InternalDateTime<UNIT>> {
static std::shared_ptr<arrow::DataType> GetDataType() {
return arrow::timestamp(UNIT, "UTC");
}
static arrow::TimestampBuilder GetBuilder() {
return arrow::TimestampBuilder(GetDataType(), arrow::default_memory_pool());
}
static int64_t Reinterpret(const deephaven::client::utility::internal::InternalDateTime<UNIT> &o) {
return o.value_;
}
static std::string_view GetDeephavenTypeName() {
return "java.time.ZonedDateTime";
}
};

template<arrow::TimeUnit::type UNIT>
struct TypeConverterTraits<deephaven::client::utility::internal::InternalLocalTime<UNIT>> {
static std::shared_ptr<arrow::DataType> GetDataType() {
return arrow::time64(UNIT);
}
static arrow::Time64Builder GetBuilder() {
return arrow::Time64Builder(GetDataType(), arrow::default_memory_pool());
}
static int64_t Reinterpret(const deephaven::client::utility::internal::InternalLocalTime<UNIT> &o) {
return o.value_;
}
static std::string_view GetDeephavenTypeName() {
return "java.time.LocalTime";
}
};

template<typename T>
TypeConverter TypeConverter::CreateNew(const std::vector<T> &values) {
using deephaven::client::utility::OkOrThrow;
Expand Down
87 changes: 87 additions & 0 deletions cpp-client/deephaven/dhclient/src/arrowutil/arrow_column_source.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/*
* Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending
*/
#include "deephaven/client/arrowutil/arrow_column_source.h"
#include "deephaven/client/utility/arrow_util.h"

using deephaven::client::utility::OkOrThrow;

namespace deephaven::client::arrowutil {
namespace internal {

namespace {
struct NanoScaleFactorVisitor final : public arrow::TypeVisitor {
size_t result_ = 1;

arrow::Status Visit(const arrow::Int8Type &/*type*/) final {
return arrow::Status::OK();
}

arrow::Status Visit(const arrow::Int16Type &/*type*/) final {
return arrow::Status::OK();
}

arrow::Status Visit(const arrow::Int32Type &/*type*/) final {
return arrow::Status::OK();
}

arrow::Status Visit(const arrow::Int64Type &/*type*/) final {
return arrow::Status::OK();
}

arrow::Status Visit(const arrow::FloatType &/*type*/) final {
return arrow::Status::OK();
}

arrow::Status Visit(const arrow::DoubleType &/*type*/) final {
return arrow::Status::OK();
}

arrow::Status Visit(const arrow::BooleanType &/*type*/) final {
return arrow::Status::OK();
}

arrow::Status Visit(const arrow::UInt16Type &/*type*/) final {
return arrow::Status::OK();
}

arrow::Status Visit(const arrow::StringType &/*type*/) final {
return arrow::Status::OK();
}

arrow::Status Visit(const arrow::TimestampType &type) final {
result_ = ScaleFromUnit(type.unit());
return arrow::Status::OK();
}

arrow::Status Visit(const arrow::Date64Type &/*type*/) final {
return arrow::Status::OK();
}

arrow::Status Visit(const arrow::Time64Type &type) final {
result_ = ScaleFromUnit(type.unit());
return arrow::Status::OK();
}

static size_t ScaleFromUnit(arrow::TimeUnit::type unit) {
switch (unit) {
case arrow::TimeUnit::SECOND: return 1'000'000'000;
case arrow::TimeUnit::MILLI: return 1'000'000;
case arrow::TimeUnit::MICRO: return 1'000;
case arrow::TimeUnit::NANO: return 1;
default: {
auto message = fmt::format("Unhandled arrow::TimeUnit {}", static_cast<size_t>(unit));
throw std::runtime_error(DEEPHAVEN_LOCATION_STR(message));
}
}
}
};
} // namespace

size_t CalcTimeNanoScaleFactor(const arrow::Array &array) {
NanoScaleFactorVisitor visitor;
OkOrThrow(DEEPHAVEN_LOCATION_EXPR(array.type()->Accept(&visitor)));
return visitor.result_;
}
} // namespace internal
} // namespace deephaven::client::arrowutil
14 changes: 2 additions & 12 deletions cpp-client/deephaven/dhclient/src/utility/arrow_util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -81,12 +81,7 @@ struct ArrowToElementTypeId final : public arrow::TypeVisitor {
return arrow::Status::OK();
}

arrow::Status Visit(const arrow::TimestampType &type) final {
if (type.unit() != arrow::TimeUnit::NANO) {
auto message = fmt::format("Expected TimestampType with nano units, got {}",
type.ToString());
throw std::runtime_error(DEEPHAVEN_LOCATION_STR(message));
}
arrow::Status Visit(const arrow::TimestampType &/*type*/) final {
type_id_ = ElementTypeId::kTimestamp;
return arrow::Status::OK();
}
Expand All @@ -96,12 +91,7 @@ struct ArrowToElementTypeId final : public arrow::TypeVisitor {
return arrow::Status::OK();
}

arrow::Status Visit(const arrow::Time64Type &type) final {
if (type.unit() != arrow::TimeUnit::NANO) {
auto message = fmt::format("Expected Time64Type with nano units, got {}",
type.ToString());
throw std::runtime_error(DEEPHAVEN_LOCATION_STR(message));
}
arrow::Status Visit(const arrow::Time64Type &/*type*/) final {
type_id_ = ElementTypeId::kLocalTime;
return arrow::Status::OK();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -541,7 +541,6 @@ class LocalTime {
/**
* Converts nanoseconds-since-start-of-day to LocalTime. The Deephaven null value sentinel is
* turned into LocalTime(0).
* TODO(kosak): find out null convention
* @param nanos Nanoseconds since the start of the day.
* @return The corresponding LocalTime.
*/
Expand Down Expand Up @@ -579,7 +578,6 @@ class LocalTime {
return !(lhs == rhs);
}
};

} // namespace deephaven::dhcore

template<> struct fmt::formatter<deephaven::dhcore::DateTime> : ostream_formatter {};
Expand Down
1 change: 1 addition & 0 deletions cpp-client/deephaven/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ add_executable(dhclient_tests
src/table_test.cc
src/test_util.cc
src/ticking_test.cc
src/time_unit_test.cc
src/ungroup_test.cc
src/update_by_test.cc
src/utility_test.cc
Expand Down
Loading