Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-35786: [C++] Add pairwise_diff function #35787

Merged
merged 17 commits into from
Jun 29, 2023
1 change: 1 addition & 0 deletions cpp/src/arrow/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -456,6 +456,7 @@ if(ARROW_COMPUTE)
compute/kernels/scalar_validity.cc
compute/kernels/vector_array_sort.cc
compute/kernels/vector_cumulative_ops.cc
compute/kernels/vector_pairwise.cc
compute/kernels/vector_nested.cc
compute/kernels/vector_rank.cc
compute/kernels/vector_replace.cc
Expand Down
17 changes: 17 additions & 0 deletions cpp/src/arrow/compute/api_vector.cc
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
#include "arrow/result.h"
#include "arrow/util/checked_cast.h"
#include "arrow/util/logging.h"
#include "arrow/util/reflection_internal.h"

namespace arrow {

Expand Down Expand Up @@ -150,6 +151,8 @@ static auto kRankOptionsType = GetFunctionOptionsType<RankOptions>(
DataMember("sort_keys", &RankOptions::sort_keys),
DataMember("null_placement", &RankOptions::null_placement),
DataMember("tiebreaker", &RankOptions::tiebreaker));
static auto kPairwiseOptionsType = GetFunctionOptionsType<PairwiseOptions>(
DataMember("periods", &PairwiseOptions::periods));
} // namespace
} // namespace internal

Expand Down Expand Up @@ -217,6 +220,10 @@ RankOptions::RankOptions(std::vector<SortKey> sort_keys, NullPlacement null_plac
tiebreaker(tiebreaker) {}
constexpr char RankOptions::kTypeName[];

PairwiseOptions::PairwiseOptions(int64_t periods)
: FunctionOptions(internal::kPairwiseOptionsType), periods(periods) {}
constexpr char PairwiseOptions::kTypeName[];

namespace internal {
void RegisterVectorOptions(FunctionRegistry* registry) {
DCHECK_OK(registry->AddFunctionOptionsType(kFilterOptionsType));
Expand All @@ -229,6 +236,7 @@ void RegisterVectorOptions(FunctionRegistry* registry) {
DCHECK_OK(registry->AddFunctionOptionsType(kSelectKOptionsType));
DCHECK_OK(registry->AddFunctionOptionsType(kCumulativeOptionsType));
DCHECK_OK(registry->AddFunctionOptionsType(kRankOptionsType));
DCHECK_OK(registry->AddFunctionOptionsType(kPairwiseOptionsType));
}
} // namespace internal

Expand Down Expand Up @@ -338,6 +346,15 @@ Result<std::shared_ptr<StructArray>> ValueCounts(const Datum& value, ExecContext
return checked_pointer_cast<StructArray>(result.make_array());
}

Result<std::shared_ptr<Array>> PairwiseDiff(const Array& array,
const PairwiseOptions& options,
bool check_overflow, ExecContext* ctx) {
auto func_name = check_overflow ? "pairwise_diff_checked" : "pairwise_diff";
ARROW_ASSIGN_OR_RAISE(Datum result,
CallFunction(func_name, {Datum(array)}, &options, ctx));
return result.make_array();
}

// ----------------------------------------------------------------------
// Filter- and take-related selection functions

Expand Down
33 changes: 33 additions & 0 deletions cpp/src/arrow/compute/api_vector.h
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,17 @@ class ARROW_EXPORT CumulativeOptions : public FunctionOptions {
};
using CumulativeSumOptions = CumulativeOptions; // For backward compatibility

/// \brief Options for pairwise functions
class ARROW_EXPORT PairwiseOptions : public FunctionOptions {
public:
explicit PairwiseOptions(int64_t periods = 1);
static constexpr char const kTypeName[] = "PairwiseOptions";
static PairwiseOptions Defaults() { return PairwiseOptions(); }

/// Periods to shift for applying the binary operation, accepts negative values.
int64_t periods = 1;
};

/// @}

/// \brief Filter with a boolean selection filter
Expand Down Expand Up @@ -650,6 +661,28 @@ Result<Datum> CumulativeMin(
const Datum& values, const CumulativeOptions& options = CumulativeOptions::Defaults(),
ExecContext* ctx = NULLPTR);

/// \brief Return the first order difference of an array.
///
/// Computes the first order difference of an array, i.e.
/// output[i] = input[i] - input[i - p] if i >= p
/// output[i] = null otherwise
/// where p is the period. For example, with p = 1,
/// Diff([1, 4, 9, 10, 15]) = [null, 3, 5, 1, 5].
/// With p = 2,
/// Diff([1, 4, 9, 10, 15]) = [null, null, 8, 6, 6]
/// p can also be negative, in which case the diff is computed in
/// the opposite direction.
/// \param[in] array array input
/// \param[in] options options, specifying overflow behavior and period
/// \param[in] check_overflow whether to return error on overflow
/// \param[in] ctx the function execution context, optional
/// \return result as array
ARROW_EXPORT
Result<std::shared_ptr<Array>> PairwiseDiff(const Array& array,
const PairwiseOptions& options,
bool check_overflow = false,
ExecContext* ctx = NULLPTR);

// ----------------------------------------------------------------------
// Deprecated functions

Expand Down
3 changes: 3 additions & 0 deletions cpp/src/arrow/compute/exec.h
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,9 @@ struct ARROW_EXPORT ExecResult {
const std::shared_ptr<ArrayData>& array_data() const {
return std::get<std::shared_ptr<ArrayData>>(this->value);
}
ArrayData* array_data_mutable() {
return std::get<std::shared_ptr<ArrayData>>(this->value).get();
}

bool is_array_data() const { return this->value.index() == 1; }
};
Expand Down
6 changes: 4 additions & 2 deletions cpp/src/arrow/compute/kernel.h
Original file line number Diff line number Diff line change
Expand Up @@ -283,14 +283,16 @@ class ARROW_EXPORT OutputType {
///
/// This function SHOULD _not_ be used to check for arity, that is to be
/// performed one or more layers above.
using Resolver = Result<TypeHolder> (*)(KernelContext*, const std::vector<TypeHolder>&);
using Resolver =
std::function<Result<TypeHolder>(KernelContext*, const std::vector<TypeHolder>&)>;

/// \brief Output an exact type
OutputType(std::shared_ptr<DataType> type) // NOLINT implicit construction
: kind_(FIXED), type_(std::move(type)) {}

/// \brief Output a computed type depending on actual input types
OutputType(Resolver resolver) // NOLINT implicit construction
template <typename Fn>
OutputType(Fn resolver) // NOLINT implicit construction
: kind_(COMPUTED), resolver_(std::move(resolver)) {}

OutputType(const OutputType& other) {
Expand Down
1 change: 1 addition & 0 deletions cpp/src/arrow/compute/kernels/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ add_arrow_benchmark(scalar_temporal_benchmark PREFIX "arrow-compute")
add_arrow_compute_test(vector_test
SOURCES
vector_cumulative_ops_test.cc
vector_pairwise_test.cc
vector_hash_test.cc
vector_nested_test.cc
vector_replace_test.cc
Expand Down
182 changes: 182 additions & 0 deletions cpp/src/arrow/compute/kernels/vector_pairwise.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

// Vector kernels for pairwise computation

#include <iostream>
#include <memory>
#include "arrow/builder.h"
#include "arrow/compute/api_vector.h"
#include "arrow/compute/exec.h"
#include "arrow/compute/function.h"
#include "arrow/compute/kernel.h"
#include "arrow/compute/kernels/base_arithmetic_internal.h"
#include "arrow/compute/kernels/codegen_internal.h"
#include "arrow/compute/registry.h"
#include "arrow/compute/util.h"
#include "arrow/status.h"
#include "arrow/type.h"
#include "arrow/type_fwd.h"
#include "arrow/type_traits.h"
#include "arrow/util/bit_util.h"
#include "arrow/util/checked_cast.h"
#include "arrow/util/logging.h"
#include "arrow/visit_type_inline.h"

namespace arrow::compute::internal {

// We reuse the kernel exec function of a scalar binary function to compute pairwise
// results. For example, for pairwise_diff, we reuse subtract's kernel exec.
struct PairwiseState : KernelState {
PairwiseState(const PairwiseOptions& options, const ArrayKernelExec& scalar_exec)
: periods(options.periods), scalar_exec(scalar_exec) {}

int64_t periods;
const ArrayKernelExec& scalar_exec;
};

KernelInit GeneratePairwiseInit(const ArrayKernelExec& scalar_exec) {
return [&scalar_exec](KernelContext* ctx, const KernelInitArgs& args) {
return std::make_unique<PairwiseState>(
checked_cast<const PairwiseOptions&>(*args.options), scalar_exec);
};
}

/// A generic pairwise implementation that can be reused by different ops.
Status PairwiseExecImpl(KernelContext* ctx, const ArraySpan& input,
const ArrayKernelExec& scalar_exec, int64_t periods,
ArrayData* result) {
auto offset = abs(periods);
offset = std::min(offset, input.length);
auto exec_length = input.length - offset;
// prepare bitmap
auto null_start = periods > 0 ? 0 : exec_length;
auto non_null_start = periods > 0 ? offset : 0;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: could we call these two regions "computed" and "margin"? I think that'll be more obvious for the next maintainer, and it'd help if we move all the start/length calculation to this preamble too

Suggested change
auto offset = abs(periods);
offset = std::min(offset, input.length);
auto exec_length = input.length - offset;
// prepare bitmap
auto null_start = periods > 0 ? 0 : exec_length;
auto non_null_start = periods > 0 ? offset : 0;
// We only compute values in the region where the input-with-offset overlaps
// the original input. The margin where these do not overlap gets filled with null.
auto margin_length = std::min(abs(periods), input.length);
auto computed_length = input.length - margin_length;
auto margin_start = periods > 0 ? 0 : computed_length;
auto left_start = periods > 0 ? margin_length : 0;
auto right_start = periods > 0 ? 0 : margin_length;
// ...
// prepare bitmap

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated. It's much clearer now. thanks for the suggestion!

bit_util::ClearBitmap(result->buffers[0]->mutable_data(), null_start, offset);
for (int64_t i = non_null_start; i < non_null_start + exec_length; i++) {
if (input.IsValid(i) && input.IsValid(i - periods)) {
bit_util::SetBit(result->buffers[0]->mutable_data(), i);
} else {
bit_util::ClearBit(result->buffers[0]->mutable_data(), i);
}
}
// prepare input span
ArraySpan left(input);
left.SetSlice(periods > 0 ? offset : 0, exec_length);
ArraySpan right(input);
right.SetSlice(periods > 0 ? 0 : offset, exec_length);
// prepare output span
ArraySpan output_span;
output_span.SetMembers(*result);
output_span.offset = periods > 0 ? offset : 0;
output_span.length = exec_length;
ExecResult output{output_span};
// execute scalar function
RETURN_NOT_OK(scalar_exec(ctx, ExecSpan({left, right}, exec_length), &output));

return Status::OK();
}

Status PairwiseExec(KernelContext* ctx, const ExecSpan& batch, ExecResult* out) {
const auto& state = checked_cast<const PairwiseState&>(*ctx->state());
auto input = batch[0].array;
RETURN_NOT_OK(PairwiseExecImpl(ctx, batch[0].array, state.scalar_exec, state.periods,
out->array_data_mutable()));
return Status::OK();
}

const FunctionDoc pairwise_diff_doc(
"Compute first order difference of an array",
("Computes the first order difference of an array, It internally calls \n"
"the scalar function \"subtract\" to compute \n differences, so its \n"
"behavior and supported types are the same as \n"
"\"subtract\". The period can be specified in :struct:`PairwiseOptions`.\n"
"\n"
"Results will wrap around on integer overflow. Use function \n"
"\"pairwise_diff_checked\" if you want overflow to return an error."),
{"input"}, "PairwiseOptions");

const FunctionDoc pairwise_diff_checked_doc(
"Compute first order difference of an array",
("Computes the first order difference of an array, It internally calls \n"
"the scalar function \"subtract_checked\" (or the checked variant) to compute \n"
"differences, so its behavior and supported types are the same as \n"
"\"subtract_checked\". The period can be specified in :struct:`PairwiseOptions`.\n"
"\n"
"This function returns an error on overflow. For a variant that doesn't \n"
"fail on overflow, use function \"pairwise_diff\"."),
{"input"}, "PairwiseOptions");

const PairwiseOptions* GetDefaultPairwiseOptions() {
static const auto kDefaultPairwiseOptions = PairwiseOptions::Defaults();
return &kDefaultPairwiseOptions;
}

struct PairwiseKernelData {
InputType input;
OutputType output;
ArrayKernelExec exec;
};

void RegisterPairwiseDiffKernels(std::string_view func_name,
std::string_view base_func_name, const FunctionDoc& doc,
FunctionRegistry* registry) {
VectorKernel base_kernel;
base_kernel.can_execute_chunkwise = false;
base_kernel.null_handling = NullHandling::COMPUTED_PREALLOCATE;
base_kernel.mem_allocation = MemAllocation::PREALLOCATE;
base_kernel.init = OptionsWrapper<PairwiseOptions>::Init;
auto func = std::make_shared<VectorFunction>(std::string(func_name), Arity::Unary(),
doc, GetDefaultPairwiseOptions());

auto base_func_result = registry->GetFunction(std::string(base_func_name));
DCHECK_OK(base_func_result.status());
const auto& base_func = checked_cast<const ScalarFunction&>(**base_func_result);
DCHECK_EQ(base_func.arity().num_args, 2);

for (const auto& base_func_kernel : base_func.kernels()) {
const auto& base_func_kernel_sig = base_func_kernel->signature;
if (base_func_kernel_sig->in_types()[0].Equals(base_func_kernel_sig->in_types()[1])) {
OutputType out_type(base_func_kernel_sig->out_type());
// Need to wrap base output resolver
if (out_type.kind() == OutputType::COMPUTED) {
const auto& base_resolver = base_func_kernel_sig->out_type().resolver();
auto resolver = [&base_resolver](KernelContext* ctx,
const std::vector<TypeHolder>& input_types) {
return base_resolver(ctx, {input_types[0], input_types[0]});
};
out_type = OutputType(resolver);
}

base_kernel.signature =
KernelSignature::Make({base_func_kernel_sig->in_types()[0]}, out_type);
base_kernel.exec = PairwiseExec;
base_kernel.init = GeneratePairwiseInit(base_func_kernel->exec);
DCHECK_OK(func->AddKernel(base_kernel));
}
}

DCHECK_OK(registry->AddFunction(std::move(func)));
}

void RegisterVectorPairwise(FunctionRegistry* registry) {
RegisterPairwiseDiffKernels("pairwise_diff", "subtract", pairwise_diff_doc, registry);
RegisterPairwiseDiffKernels("pairwise_diff_checked", "subtract_checked",
pairwise_diff_checked_doc, registry);
}

} // namespace arrow::compute::internal
Loading