Skip to content

Commit

Permalink
Fix performance regression in Reduce operators introduced by PR #7206 (
Browse files Browse the repository at this point in the history
…#7719)

* improves ArgMin implementation
* update  parallelization cost
* choose former implementation for KRK case, when K=1
* improves unit test
  • Loading branch information
xadupre authored May 24, 2021
1 parent f487f6b commit a41255c
Show file tree
Hide file tree
Showing 3 changed files with 161 additions and 107 deletions.
189 changes: 122 additions & 67 deletions onnxruntime/core/providers/cpu/reduction/reduction_ops.cc
Original file line number Diff line number Diff line change
Expand Up @@ -277,10 +277,10 @@ void ReduceAggregatorBase::FastReduceKRK(const Tensor&, const std::vector<int64_
ValidateMustBeOverloaded();
}

TensorOpCost ParallelReduceFastCost(int64_t n_row, int64_t n_col, int64_t element_size) {
return TensorOpCost{static_cast<double>(n_col * n_row * element_size),
TensorOpCost ParallelReduceFastCost(int64_t n_row, int64_t n_col, int64_t element_size, int n_ops) {
return TensorOpCost{static_cast<double>(n_row * n_col * element_size),
static_cast<double>(n_row * element_size),
static_cast<double>(n_col * n_row * element_size * 2)};
static_cast<double>(n_row * n_col * element_size * n_ops)};
}

void NoTransposePrepareForReduce(const TensorShape& new_input_shape,
Expand Down Expand Up @@ -384,6 +384,15 @@ void ValidateNoTransposeReduce(int64_t count) {
ORT_ENFORCE(count == 1, "Reduction on all axes, output size should be 1.");
}

template <typename AGG>
struct ParallelizedData {
int64_t denominator;
int64_t loop_size;
ResultsNoTransposePrepareForReduce* last_results;
const typename AGG::input_type* from_data;
typename AGG::value_type* to_data;
};

template <typename AGG>
void NoTransposeReduce1Loop(Tensor* output, const TensorShape& new_input_shape, const Tensor& input,
const std::vector<int64_t>& reduced_axes, concurrency::ThreadPool* tp,
Expand All @@ -406,35 +415,47 @@ void NoTransposeReduce1Loop(Tensor* output, const TensorShape& new_input_shape,
return;
}
last_results.ValidateNotEmpty();
int64_t denominator = last_results.last_loop_red_size * last_results.projected_index.size();

auto fn = [&](std::ptrdiff_t first, std::ptrdiff_t end) {
int64_t loop;
ParallelizedData<AGG> data;
data.denominator = last_results.last_loop_red_size * last_results.projected_index.size();
data.loop_size = last_results.last_loop_red_size * last_results.last_loop_red_inc;
data.last_results = &last_results;
data.from_data = from_data;
data.to_data = to_data;

auto fn = [&data](std::ptrdiff_t first, std::ptrdiff_t end) {
const typename AGG::input_type* loop_red_ptr;
const typename AGG::input_type* loop_red_ptr_end;
int64_t current_index = first * last_results.last_loop_size;
for (int64_t main_index = first; main_index < end; ++main_index) {
for (loop = 0; loop < last_results.last_loop_size; ++loop, ++current_index) {
int64_t origin = last_results.unprojected_index[main_index] + loop * last_results.last_loop_inc;
AGG accumulator(denominator, from_data[origin + last_results.projected_index[0]]);
for (auto it = last_results.projected_index.begin(); it != last_results.projected_index.end(); ++it) {
loop_red_ptr = from_data + (origin + *it);
loop_red_ptr_end = loop_red_ptr + last_results.last_loop_red_size * last_results.last_loop_red_inc;
for (; loop_red_ptr != loop_red_ptr_end; loop_red_ptr += last_results.last_loop_red_inc) {
accumulator.update(*loop_red_ptr);
}
const ResultsNoTransposePrepareForReduce& last_results = *data.last_results;
int64_t main_index = first / last_results.last_loop_size;
int64_t loop = first % last_results.last_loop_size;
int64_t origin = last_results.unprojected_index[main_index] + loop * last_results.last_loop_inc;
for (int64_t main_index_last_loop = first; main_index_last_loop < end; ++main_index_last_loop) {
AGG accumulator(data.denominator, data.from_data[origin + last_results.projected_index[0]]);
for (auto it = last_results.projected_index.begin(); it != last_results.projected_index.end(); ++it) {
loop_red_ptr = data.from_data + (origin + *it);
for (int64_t red = 0; red < data.loop_size; red += last_results.last_loop_red_inc) {
accumulator.update(loop_red_ptr[red]);
}
}
data.to_data[main_index_last_loop] = accumulator.get_value();

++loop;
if (loop >= last_results.last_loop_size) {
loop = 0;
++main_index;
if (main_index < static_cast<int64_t>(last_results.unprojected_index.size())) {
origin = last_results.unprojected_index[main_index];
}
to_data[current_index] = accumulator.get_value();
} else {
origin += last_results.last_loop_inc;
}
}
};

auto cost = TensorOpCost{(double)(last_results.projected_index.size() * sizeof(typename AGG::input_type) *
last_results.last_loop_size * last_results.last_loop_red_size),
(double)last_results.last_loop_size * last_results.last_loop_red_size,
(double)last_results.projected_index.size() * last_results.last_loop_size *
last_results.last_loop_red_size};
concurrency::ThreadPool::TryParallelFor(tp, count / last_results.last_loop_size, cost, fn);
auto cost = ParallelReduceFastCost(1,
last_results.projected_index.size() * last_results.last_loop_red_size,
sizeof(typename AGG::input_type), 6);
concurrency::ThreadPool::TryParallelFor(tp, count, cost, fn);
}

template <typename AGG>
Expand All @@ -459,42 +480,54 @@ void NoTransposeReduce2Loops(Tensor* output, const TensorShape& new_input_shape,
return;
}
last_results.ValidateNotEmpty();
int64_t denominator = last_results.last_loop_red_size * last_results.projected_index.size();

ParallelizedData<AGG> data;
data.denominator = last_results.last_loop_red_size * last_results.projected_index.size();
data.loop_size = last_results.last_loop_red_size * last_results.last_loop_red_inc;
data.last_results = &last_results;
data.from_data = from_data;
data.to_data = to_data;

auto fn = [&](std::ptrdiff_t first, std::ptrdiff_t end) {
int64_t loop;
const typename AGG::input_type* loop_red_ptr;
const typename AGG::input_type* loop_red_ptr_end;
int64_t current_index = first * last_results.last_loop_size;
for (int64_t main_index = first; main_index < end; ++main_index) {
for (loop = 0; loop < last_results.last_loop_size; ++loop, ++current_index) {
int64_t origin = last_results.unprojected_index[main_index] + loop * last_results.last_loop_inc;
AGG accumulator(denominator, from_data[origin + last_results.projected_index[0]]);
for (auto it = last_results.projected_index.begin(); it != last_results.projected_index.end(); ++it) {
loop_red_ptr = from_data + (origin + *it);
loop_red_ptr_end = loop_red_ptr + last_results.last_loop_red_size * last_results.last_loop_red_inc;
for (; loop_red_ptr != loop_red_ptr_end; loop_red_ptr += last_results.last_loop_red_inc) {
accumulator.update0(*loop_red_ptr);
}
const ResultsNoTransposePrepareForReduce& last_results = *data.last_results;
int64_t main_index = first / last_results.last_loop_size;
int64_t loop = first % last_results.last_loop_size;
int64_t origin = last_results.unprojected_index[main_index] + loop * last_results.last_loop_inc;
for (int64_t main_index_last_loop = first; main_index_last_loop < end; ++main_index_last_loop) {
AGG accumulator(data.denominator, data.from_data[origin + last_results.projected_index[0]]);
for (auto it = last_results.projected_index.begin(); it != last_results.projected_index.end(); ++it) {
loop_red_ptr = data.from_data + (origin + *it);
for (int64_t red = 0; red < data.loop_size; red += last_results.last_loop_red_inc) {
accumulator.update0(loop_red_ptr[red]);
}
for (auto it = last_results.projected_index.begin(); it != last_results.projected_index.end(); ++it) {
loop_red_ptr = from_data + (origin + *it);
loop_red_ptr_end = loop_red_ptr + last_results.last_loop_red_size * last_results.last_loop_red_inc;
for (; loop_red_ptr != loop_red_ptr_end; loop_red_ptr += last_results.last_loop_red_inc) {
accumulator.update(*loop_red_ptr);
}
}

for (auto it = last_results.projected_index.begin(); it != last_results.projected_index.end(); ++it) {
loop_red_ptr = data.from_data + (origin + *it);
for (int64_t red = 0; red < data.loop_size; red += last_results.last_loop_red_inc) {
accumulator.update(loop_red_ptr[red]);
}
}
data.to_data[main_index_last_loop] = accumulator.get_value();

++loop;
if (loop >= last_results.last_loop_size) {
loop = 0;
++main_index;
if (main_index < static_cast<int64_t>(last_results.unprojected_index.size())) {
origin = last_results.unprojected_index[main_index];
}
to_data[current_index] = accumulator.get_value();
} else {
origin += last_results.last_loop_inc;
}
}
};

auto cost = TensorOpCost{(double)(last_results.projected_index.size() * sizeof(typename AGG::input_type) *
last_results.last_loop_size * last_results.last_loop_red_size),
(double)last_results.last_loop_size * last_results.last_loop_red_size,
(double)last_results.projected_index.size() * last_results.last_loop_size *
last_results.last_loop_red_size * 2};
concurrency::ThreadPool::TryParallelFor(tp, count / last_results.last_loop_size, cost, fn);
auto cost = ParallelReduceFastCost(1,
last_results.projected_index.size() * last_results.last_loop_red_size,
sizeof(typename AGG::input_type), 8);
concurrency::ThreadPool::TryParallelFor(tp, count, cost, fn);
}

void DropDimensions(const std::vector<int64_t>& input_shape,
Expand Down Expand Up @@ -660,6 +693,7 @@ bool CommonFastReduceSwitch(OpKernelContext* ctx,
fast_kind = OptimizeShapeForFastReduce(
reduced_dims, input_axes.empty() ? axes_ : input_axes,
fast_shape, output_shape, fast_axes, keepdims_, noop_with_empty_axes);

if (which_fast_reduce != FastReduceKind::kNone) {
if (IsFastReduceKindAvailable(fast_kind, which_fast_reduce)) {
Tensor* output = ctx->Output(0, output_shape);
Expand All @@ -671,14 +705,25 @@ bool CommonFastReduceSwitch(OpKernelContext* ctx,
}
case FastReduceKind::kRK: {
ValidateFastReduceRK(fast_shape, *output);
case_rk(*input, fast_shape, *output, ctx->GetOperatorThreadPool());
return true;
if ((fast_shape[0] > concurrency::ThreadPool::DegreeOfParallelism(ctx->GetOperatorThreadPool()) * 16) &&
(std::max(fast_shape[0], fast_shape[1]) >
concurrency::ThreadPool::DegreeOfParallelism(ctx->GetOperatorThreadPool()) * 256)) {
// See benchmarks in PR #7719.
case_rk(*input, fast_shape, *output, ctx->GetOperatorThreadPool());
return true;
} else {
break;
}
}
case FastReduceKind::kKRK: {
case FastReduceKind::kKRK:
ValidateFastReduceKRK(fast_shape, *output);
case_krk(*input, fast_shape, *output, ctx->GetOperatorThreadPool());
return true;
}
if (fast_shape[0] >= std::max(2, concurrency::ThreadPool::DegreeOfParallelism(ctx->GetOperatorThreadPool()))) {
// See benchmarks in PR #7719.
case_krk(*input, fast_shape, *output, ctx->GetOperatorThreadPool());
return true;
} else {
break;
}
case FastReduceKind::kR:
case FastReduceKind::kK:
case FastReduceKind::kNone:
Expand All @@ -700,7 +745,8 @@ bool CommonFastReduce(OpKernelContext* ctx,
std::vector<int64_t>& fast_shape,
std::vector<int64_t>& output_shape,
std::vector<int64_t>& fast_axes) {
return CommonFastReduceSwitch(ctx, axes_, keepdims_, noop_with_empty_axes, fast_kind, fast_shape, output_shape, fast_axes,
return CommonFastReduceSwitch(ctx, axes_, keepdims_, noop_with_empty_axes,
fast_kind, fast_shape, output_shape, fast_axes,
AGG::WhichFastReduce(), &AGG::FastReduceKR, &AGG::FastReduceRK, &AGG::FastReduceKRK);
}

Expand Down Expand Up @@ -869,16 +915,25 @@ std::unique_ptr<Tensor> ReduceSum<T>::Impl(const Tensor& input, const std::vecto
ReduceAggregatorSum<T>::FastReduceKR(input, fast_shape, *output, tp);
return output;
}
case FastReduceKind::kRK: {
case FastReduceKind::kRK:
ValidateFastReduceRK(fast_shape, *output);
ReduceAggregatorSum<T>::FastReduceRK(input, fast_shape, *output, tp);
return output;
}
case FastReduceKind::kKRK: {
if (std::max(fast_shape[0], fast_shape[1]) >
concurrency::ThreadPool::DegreeOfParallelism(tp) * 256) {
// See benchmarks in PR #7719.
ReduceAggregatorSum<T>::FastReduceRK(input, fast_shape, *output, tp);
return output;
} else {
break;
}
case FastReduceKind::kKRK:
ValidateFastReduceKRK(fast_shape, *output);
ReduceAggregatorSum<T>::FastReduceKRK(input, fast_shape, *output, tp);
return output;
}
if (fast_shape[0] >= std::max(2, concurrency::ThreadPool::DegreeOfParallelism(tp))) {
// See benchmarks in PR #7719.
ReduceAggregatorSum<T>::FastReduceKRK(input, fast_shape, *output, tp);
return output;
} else {
break;
}
case FastReduceKind::kR:
case FastReduceKind::kK:
case FastReduceKind::kNone:
Expand Down
20 changes: 10 additions & 10 deletions onnxruntime/core/providers/cpu/reduction/reduction_ops.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ bool operator!=(FastReduceKind a, FastReduceKind b);
bool IsFastReduceKindAvailable(FastReduceKind scenario, FastReduceKind available);

/* Evaluate the cost of parallelized FastReduce implementations. */
TensorOpCost ParallelReduceFastCost(int64_t n_row, int64_t n_col, int64_t element_size);
TensorOpCost ParallelReduceFastCost(int64_t n_row, int64_t n_col, int64_t element_size, int n_ops);

/**
This only improves reduce function when reduced axes are contiguous:
Expand Down Expand Up @@ -192,7 +192,7 @@ class ReduceAggregatorSum : public ReduceAggregator<T, TVAL> {
T* out = output.MutableData<T>();
int64_t stridei = fast_shape[1];
concurrency::ThreadPool::TryParallelFor(
tp, fast_shape[0], ParallelReduceFastCost(1, stridei, sizeof(T)),
tp, fast_shape[0], ParallelReduceFastCost(1, stridei, sizeof(T), 6),
[data, stridei, out](ptrdiff_t first, ptrdiff_t last) {
for (ptrdiff_t d = first; d < last; ++d) {
out[d] = ConstEigenVectorArrayMap<T>(data + d * stridei, stridei).sum();
Expand All @@ -209,7 +209,7 @@ class ReduceAggregatorSum : public ReduceAggregator<T, TVAL> {
int64_t n_rows = fast_shape[0];
memcpy(out, data, N * sizeof(T));
concurrency::ThreadPool::TryParallelFor(
tp, N, ParallelReduceFastCost(1, n_rows, sizeof(T)),
tp, N, ParallelReduceFastCost(1, n_rows, sizeof(T), 6),
[data, out, N, n_rows](ptrdiff_t begin, ptrdiff_t end) {
for (int64_t row = 1; row < n_rows; ++row) {
EigenVectorArrayMap<T>(out + begin, end - begin) += ConstEigenVectorArrayMap<T>(
Expand All @@ -227,7 +227,7 @@ class ReduceAggregatorSum : public ReduceAggregator<T, TVAL> {
T* out = output.MutableData<T>();
std::vector<T> one(fast_shape[1], 1);
concurrency::ThreadPool::TryParallelFor(
tp, fast_shape[0], ParallelReduceFastCost(fast_shape[1], fast_shape[2], sizeof(T)),
tp, fast_shape[0], ParallelReduceFastCost(fast_shape[1], fast_shape[2], sizeof(T), 6),
[one, data, fast_shape, stridei, strideo, out, N](ptrdiff_t begin, ptrdiff_t last) {
for (ptrdiff_t d = begin; d < last; ++d) {
math::MatMul<T>(1, N, fast_shape[1], one.data(), data + stridei * d, out + strideo * d, nullptr);
Expand Down Expand Up @@ -318,7 +318,7 @@ class ReduceAggregatorMax : public ReduceAggregator<T, TVAL> {
T* out = output.MutableData<T>();
int64_t stridei = fast_shape[1];
concurrency::ThreadPool::TryParallelFor(
tp, fast_shape[0], ParallelReduceFastCost(1, stridei, sizeof(T)),
tp, fast_shape[0], ParallelReduceFastCost(1, stridei, sizeof(T), 6),
[data, stridei, out](std::ptrdiff_t first, std::ptrdiff_t last) {
EigenVectorMap<T>(out + first, last - first) = ConstEigenMatrixMap<T>(
data + first * stridei, stridei, last - first)
Expand All @@ -336,7 +336,7 @@ class ReduceAggregatorMax : public ReduceAggregator<T, TVAL> {
memcpy(out, data, N * sizeof(T));

concurrency::ThreadPool::TryParallelFor(
tp, N, ParallelReduceFastCost(1, n_rows, sizeof(T)),
tp, N, ParallelReduceFastCost(1, n_rows, sizeof(T), 6),
[data, out, N, n_rows](ptrdiff_t begin, ptrdiff_t end) {
const T* p;
for (int64_t row = 1; row < n_rows; ++row) {
Expand All @@ -355,7 +355,7 @@ class ReduceAggregatorMax : public ReduceAggregator<T, TVAL> {
int64_t stridei = fast_shape[1] * fast_shape[2];
int64_t strideo = fast_shape[2];
concurrency::ThreadPool::TryParallelFor(
tp, fast_shape[0], ParallelReduceFastCost(fast_shape[1], fast_shape[2], sizeof(T)),
tp, fast_shape[0], ParallelReduceFastCost(fast_shape[1], fast_shape[2], sizeof(T), 6),
[data, fast_shape, stridei, strideo, out](ptrdiff_t begin, ptrdiff_t end) {
for (ptrdiff_t j = begin; j < end; ++j) {
EigenVectorMap<T>(out + j * strideo, strideo) =
Expand Down Expand Up @@ -477,7 +477,7 @@ class ReduceAggregatorMin : public ReduceAggregator<T, TVAL> {
T* out = output.MutableData<T>();
int64_t stridei = fast_shape[1];
concurrency::ThreadPool::TryParallelFor(
tp, fast_shape[0], ParallelReduceFastCost(1, stridei, sizeof(T)),
tp, fast_shape[0], ParallelReduceFastCost(1, stridei, sizeof(T), 6),
[data, stridei, out](std::ptrdiff_t first, std::ptrdiff_t last) {
EigenVectorMap<T>(out + first, last - first) = ConstEigenMatrixMap<T>(
data + first * stridei, stridei, last - first)
Expand All @@ -495,7 +495,7 @@ class ReduceAggregatorMin : public ReduceAggregator<T, TVAL> {
memcpy(out, data, N * sizeof(T));

concurrency::ThreadPool::TryParallelFor(
tp, N, ParallelReduceFastCost(1, n_rows, sizeof(T)),
tp, N, ParallelReduceFastCost(1, n_rows, sizeof(T), 6),
[data, out, N, n_rows](ptrdiff_t begin, ptrdiff_t end) {
const T* p;
for (int64_t row = 1; row < n_rows; ++row) {
Expand All @@ -514,7 +514,7 @@ class ReduceAggregatorMin : public ReduceAggregator<T, TVAL> {
int64_t stridei = fast_shape[1] * fast_shape[2];
int64_t strideo = fast_shape[2];
concurrency::ThreadPool::TryParallelFor(
tp, fast_shape[0], ParallelReduceFastCost(fast_shape[1], fast_shape[2], sizeof(T)),
tp, fast_shape[0], ParallelReduceFastCost(fast_shape[1], fast_shape[2], sizeof(T), 6),
[data, fast_shape, stridei, strideo, out](ptrdiff_t begin, ptrdiff_t end) {
for (ptrdiff_t j = begin; j < end; ++j) {
EigenVectorMap<T>(out + j * strideo, strideo) =
Expand Down
Loading

0 comments on commit a41255c

Please sign in to comment.