Skip to content

Commit

Permalink
GH-35837: [C++] Acero will hang if StopProducing is called while back…
Browse files Browse the repository at this point in the history
…pressure is applied on the source node (#35902)

### What changes are included in this PR?

The stopping logic for `SourceNode` now ensures the node is first resumed.

### Are these changes tested?

Yes. A new `source_node_test` tester is added for this.

### Are there any user-facing changes?

No.

**This PR contains a "Critical Fix".**

* Closes: #35837 

Authored-by: Yaron Gvili <rtpsw@hotmail.com>
Signed-off-by: Li Jin <ice.xelloss@gmail.com>
  • Loading branch information
rtpsw authored Jun 16, 2023
1 parent b6dabdf commit e39386b
Show file tree
Hide file tree
Showing 4 changed files with 181 additions and 1 deletion.
1 change: 1 addition & 0 deletions cpp/src/arrow/acero/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ add_arrow_acero_test(plan_test
plan_test.cc
test_nodes_test.cc
test_nodes.cc)
add_arrow_acero_test(source_node_test SOURCES source_node_test.cc test_nodes.cc)
add_arrow_acero_test(fetch_node_test SOURCES fetch_node_test.cc test_nodes.cc)
add_arrow_acero_test(order_by_node_test SOURCES order_by_node_test.cc test_nodes.cc)
add_arrow_acero_test(hash_join_node_test SOURCES hash_join_node_test.cc
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/arrow/acero/exec_plan.h
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,7 @@ class ARROW_ACERO_EXPORT ExecNode {
///
/// This is not a pause. There will be no way to start the source again after this has
/// been called.
Status StopProducing();
virtual Status StopProducing();

std::string ToString(int indent = 0) const;

Expand Down
18 changes: 18 additions & 0 deletions cpp/src/arrow/acero/source_node.cc
Original file line number Diff line number Diff line change
Expand Up @@ -259,10 +259,28 @@ struct SourceNode : ExecNode, public TracedNode {
return;
}
to_finish = backpressure_future_;
backpressure_future_ = Future<>::MakeFinished();
}
to_finish.MarkFinished();
}

Status StopProducing() override {
// GH-35837: ensure node is not paused
Future<> to_finish;
{
std::lock_guard<std::mutex> lg(mutex_);
if (!backpressure_future_.is_finished()) {
to_finish = backpressure_future_;
backpressure_future_ = Future<>::MakeFinished();
}
}
if (to_finish.is_valid()) {
to_finish.MarkFinished();
}
// only then stop
return ExecNode::StopProducing();
}

Status StopProducingImpl() override {
std::unique_lock<std::mutex> lock(mutex_);
stop_requested_ = true;
Expand Down
161 changes: 161 additions & 0 deletions cpp/src/arrow/acero/source_node_test.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include <gtest/gtest.h>

#include "arrow/acero/exec_plan.h"
#include "arrow/acero/map_node.h"
#include "arrow/acero/options.h"
#include "arrow/acero/test_nodes.h"

namespace arrow {
namespace acero {

struct PauseThenStopNodeOptions : public ExecNodeOptions {
explicit PauseThenStopNodeOptions(int num_pass) : num_pass(num_pass) {}

int num_pass;
};

template <typename ThisNode>
struct PauseThenStopNode : public MapNode {
PauseThenStopNode(ExecPlan* plan, std::vector<ExecNode*> inputs,
std::shared_ptr<Schema> output_schema,
const PauseThenStopNodeOptions& options)
: MapNode(plan, inputs, output_schema), num_pass(options.num_pass) {}

static Result<ExecNode*> Make(ExecPlan* plan, std::vector<ExecNode*> inputs,
const ExecNodeOptions& options) {
RETURN_NOT_OK(ValidateExecNodeInputs(plan, inputs, 1, ThisNode::kKindName));
auto bp_options = static_cast<const PauseThenStopNodeOptions&>(options);
if (bp_options.num_pass < 2) {
return Status::Invalid("num_pass must be at least 2");
}
return plan->EmplaceNode<ThisNode>(plan, inputs, inputs[0]->output_schema(),
bp_options);
}

const char* kind_name() const override { return ThisNode::kKindName; }
Result<ExecBatch> ProcessBatch(ExecBatch batch) override {
if (num_pass == 1) {
inputs()[0]->PauseProducing(this, 1);
ARROW_RETURN_NOT_OK(static_cast<ThisNode*>(this)->DoStopProducing());
}
if (num_pass > 0) --num_pass;
return batch;
}

int num_pass;
};

struct PauseThenStopInputNode : public PauseThenStopNode<PauseThenStopInputNode> {
static constexpr bool kPlanExitsEarly = false;

static constexpr const char* kKindName = "PauseThenStopInputNode";
static constexpr const char* kFactoryName = "pause_then_stop_input";

static Status Register() {
auto exec_reg = default_exec_factory_registry();
if (!exec_reg->GetFactory(kFactoryName).ok()) {
ARROW_RETURN_NOT_OK(
exec_reg->AddFactory(kFactoryName, PauseThenStopInputNode::Make));
}
return Status::OK();
}

using PauseThenStopNode::PauseThenStopNode;

Status DoStopProducing() { return inputs()[0]->StopProducing(); }
};

struct PauseThenStopPlanNode : public PauseThenStopNode<PauseThenStopPlanNode> {
static constexpr bool kPlanExitsEarly = true;

static constexpr const char* kKindName = "PauseThenStopPlanNode";
static constexpr const char* kFactoryName = "pause_then_stop_plan";

static Status Register() {
auto exec_reg = default_exec_factory_registry();
if (!exec_reg->GetFactory(kFactoryName).ok()) {
ARROW_RETURN_NOT_OK(
exec_reg->AddFactory(kFactoryName, PauseThenStopPlanNode::Make));
}
return Status::OK();
}

using PauseThenStopNode::PauseThenStopNode;

Status DoStopProducing() {
plan()->StopProducing();
return Status::OK();
}
};

// GH-35837
// This tests that the plan shuts down cleanly when `StopProducing` is sent while
// backpressure is applied to the source node.
template <typename ThisNode>
void TestPauseThenStop() {
ASSERT_OK(ThisNode::Register());

// number of batches, number of batches to pass before pausing, batch size
constexpr int num_batches = 10, num_pass = 2, batch_size = 1;
// the above constants can be changed subject to the following restrictions
// to ensure that the test works
ASSERT_GE(num_pass, 1); // must pass at least one batch before pausing
ASSERT_GT(num_batches, num_pass); // must have more batches after pausing
auto t_schema = schema({field("time", int32()), field("value", int32())});
ASSERT_OK_AND_ASSIGN(auto t_batches,
MakeIntegerBatches({[](int row) -> int64_t { return row; },
[](int row) -> int64_t { return row + 1; }},
t_schema, num_batches, batch_size));

Declaration t_src = {
"source", SourceNodeOptions(t_batches.schema,
MakeDelayedGen(t_batches, "t_src", /*delay_sec=*/0.5,
/*noisy=*/false))};
Declaration ctrl = {
ThisNode::kFactoryName, {t_src}, PauseThenStopNodeOptions(num_pass)};

ASSERT_OK_AND_ASSIGN(std::unique_ptr<RecordBatchReader> batch_reader,
DeclarationToReader(ctrl, /*use_threads=*/false));

if (!ThisNode::kPlanExitsEarly) {
int64_t total_length = 0;
for (;;) {
ASSERT_OK_AND_ASSIGN(auto batch, batch_reader->Next());
if (!batch) {
break;
}
total_length += batch->num_rows();
}
ASSERT_EQ(static_cast<int64_t>(num_pass * batch_size), total_length);
} else {
for (int i = 0; i < num_pass - 1; i++) {
ASSERT_OK_AND_ASSIGN(auto batch, batch_reader->Next());
ASSERT_TRUE(batch);
}
ASSERT_RAISES(Cancelled, batch_reader->Next());
}
}

TEST(SourceNode, PauseThenStopInput) { TestPauseThenStop<PauseThenStopInputNode>(); }

TEST(SourceNode, PauseThenStopPlan) { TestPauseThenStop<PauseThenStopPlanNode>(); }

} // namespace acero
} // namespace arrow

0 comments on commit e39386b

Please sign in to comment.