Skip to content

Commit

Permalink
fix retry code to fail batches instead of creating attempt if previou…
Browse files Browse the repository at this point in the history
…sly cancelled from surface (#27217) (#27222)

* fix retry code to fail batches instead of creating attempt if previously cancelled from surface

* add xDS end2end tests covering the FI use-case that triggered the bug

* fix memory leak
  • Loading branch information
markdroth authored Sep 2, 2021
1 parent 398268a commit c452e43
Show file tree
Hide file tree
Showing 2 changed files with 84 additions and 3 deletions.
19 changes: 16 additions & 3 deletions src/core/ext/filters/client_channel/retry_filter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -88,9 +88,7 @@

// TODO(roth): In subsequent PRs:
// - add support for transparent retries (including initial metadata)
// - figure out how to record stats in census for retries
// (census filter is on top of this one)
// - add census stats for retries
// - implement hedging

// By default, we buffer 256 KiB per RPC for retries.
// TODO(roth): Do we have any data to suggest a better value?
Expand Down Expand Up @@ -539,6 +537,8 @@ class RetryFilter::CallData {
CallCombiner* call_combiner_;
grpc_call_context_element* call_context_;

grpc_error_handle cancelled_from_surface_ = GRPC_ERROR_NONE;

RefCountedPtr<CallStackDestructionBarrier> call_stack_destruction_barrier_;

// TODO(roth): As part of implementing hedging, we will need to maintain a
Expand Down Expand Up @@ -2141,6 +2141,7 @@ RetryFilter::CallData::~CallData() {
for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
GPR_ASSERT(pending_batches_[i].batch == nullptr);
}
GRPC_ERROR_UNREF(cancelled_from_surface_);
}

void RetryFilter::CallData::StartTransportStreamOpBatch(
Expand Down Expand Up @@ -2173,6 +2174,9 @@ void RetryFilter::CallData::StartTransportStreamOpBatch(
call_attempt_->CancelFromSurface(batch);
return;
}
// Save cancel_error in case subsequent batches are started.
GRPC_ERROR_UNREF(cancelled_from_surface_);
cancelled_from_surface_ = GRPC_ERROR_REF(cancel_error);
// Cancel retry timer.
if (retry_timer_pending_) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
Expand Down Expand Up @@ -2201,6 +2205,15 @@ void RetryFilter::CallData::StartTransportStreamOpBatch(
}
// If we do not yet have a call attempt, create one.
if (call_attempt_ == nullptr) {
// If we were previously cancelled from the surface, cancel this
// batch instead of creating a call attempt.
if (cancelled_from_surface_ != GRPC_ERROR_NONE) {
PendingBatchClear(pending);
// Note: This will release the call combiner.
grpc_transport_stream_op_batch_finish_with_failure(
batch, GRPC_ERROR_REF(cancelled_from_surface_), call_combiner_);
return;
}
// If there is no retry policy, then commit retries immediately.
// This ensures that the code below will always jump to the fast path.
// TODO(roth): Remove this special case when we implement
Expand Down
68 changes: 68 additions & 0 deletions test/cpp/end2end/xds_end2end_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -11992,6 +11992,74 @@ TEST_P(FaultInjectionTest, XdsFaultInjectionMaxFault) {
EXPECT_EQ(kMaxFault, num_delayed);
}

TEST_P(FaultInjectionTest, XdsFaultInjectionBidiStreamDelayOk) {
// kRpcTimeoutMilliseconds is 10s should never be reached.
const uint32_t kRpcTimeoutMilliseconds = grpc_test_slowdown_factor() * 10000;
const uint32_t kFixedDelaySeconds = 1;
const uint32_t kDelayPercentagePerHundred = 100;
SetNextResolution({});
SetNextResolutionForLbChannelAllBalancers();
// Create an EDS resource
AdsServiceImpl::EdsResourceArgs args({
{"locality0", CreateEndpointsForBackends()},
});
balancers_[0]->ads_service()->SetEdsResource(
BuildEdsResource(args, DefaultEdsServiceName()));
// Construct the fault injection filter config
HTTPFault http_fault;
auto* delay_percentage = http_fault.mutable_delay()->mutable_percentage();
delay_percentage->set_numerator(kDelayPercentagePerHundred);
delay_percentage->set_denominator(FractionalPercent::HUNDRED);
auto* fixed_delay = http_fault.mutable_delay()->mutable_fixed_delay();
fixed_delay->set_seconds(kFixedDelaySeconds);
// Config fault injection via different setup
SetFilterConfig(http_fault);
ClientContext context;
context.set_deadline(
grpc_timeout_milliseconds_to_deadline(kRpcTimeoutMilliseconds));
auto stream = stub_->BidiStream(&context);
stream->WritesDone();
auto status = stream->Finish();
EXPECT_TRUE(status.ok()) << status.error_message() << ", "
<< status.error_details() << ", "
<< context.debug_error_string();
}

// This case catches a bug in the retry code that was triggered by a bad
// interaction with the FI code. See https://github.com/grpc/grpc/pull/27217
// for description.
TEST_P(FaultInjectionTest, XdsFaultInjectionBidiStreamDelayError) {
const uint32_t kRpcTimeoutMilliseconds = grpc_test_slowdown_factor() * 500;
const uint32_t kFixedDelaySeconds = 100;
const uint32_t kDelayPercentagePerHundred = 100;
SetNextResolution({});
SetNextResolutionForLbChannelAllBalancers();
// Create an EDS resource
AdsServiceImpl::EdsResourceArgs args({
{"locality0", CreateEndpointsForBackends()},
});
balancers_[0]->ads_service()->SetEdsResource(
BuildEdsResource(args, DefaultEdsServiceName()));
// Construct the fault injection filter config
HTTPFault http_fault;
auto* delay_percentage = http_fault.mutable_delay()->mutable_percentage();
delay_percentage->set_numerator(kDelayPercentagePerHundred);
delay_percentage->set_denominator(FractionalPercent::HUNDRED);
auto* fixed_delay = http_fault.mutable_delay()->mutable_fixed_delay();
fixed_delay->set_seconds(kFixedDelaySeconds);
// Config fault injection via different setup
SetFilterConfig(http_fault);
ClientContext context;
context.set_deadline(
grpc_timeout_milliseconds_to_deadline(kRpcTimeoutMilliseconds));
auto stream = stub_->BidiStream(&context);
stream->WritesDone();
auto status = stream->Finish();
EXPECT_EQ(StatusCode::DEADLINE_EXCEEDED, status.error_code())
<< status.error_message() << ", " << status.error_details() << ", "
<< context.debug_error_string();
}

class BootstrapSourceTest : public XdsEnd2endTest {
public:
BootstrapSourceTest() : XdsEnd2endTest(4, 1) {}
Expand Down

0 comments on commit c452e43

Please sign in to comment.