Skip to content

Commit

Permalink
Add tests for ext_proc stream resets (#38346)
Browse files Browse the repository at this point in the history
These tests document current broken behavior of ext_proc always resetting gRPC side streams.

Risk Level: Low
Testing: Unit Tests
Docs Changes: N/A
Release Notes: N/A
Platform Specific Features: N/A

Signed-off-by: Yan Avlasov <yavlasov@google.com>
  • Loading branch information
yanavlasov authored Feb 7, 2025
1 parent 1706a64 commit 79f28bf
Show file tree
Hide file tree
Showing 6 changed files with 100 additions and 1 deletion.
2 changes: 1 addition & 1 deletion source/common/http/codec_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ void CodecClient::completeRequest(ActiveRequest& request) {
}

void CodecClient::onReset(ActiveRequest& request, StreamResetReason reason) {
ENVOY_CONN_LOG(debug, "request reset", *connection_);
ENVOY_CONN_LOG(debug, "Request reset. Reason {}", *connection_, static_cast<int>(reason));
if (codec_client_callbacks_) {
codec_client_callbacks_->onStreamReset(reason);
}
Expand Down
30 changes: 30 additions & 0 deletions test/common/grpc/grpc_client_integration_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -88,11 +88,41 @@ INSTANTIATE_TEST_SUITE_P(IpVersionsClientType, GrpcClientIntegrationTest,
TEST_P(GrpcClientIntegrationTest, BasicStream) {
initialize();
auto stream = createStream(empty_metadata_);
// Send request without END_STREAM set to true. By default Envoy gRPC client will reset
// the stream upon receiving response with trailers. Google gRPC client will not reset the
// stream as it by default supports independent half-close. auto stream =
// createStream(empty_metadata_);
stream->sendRequest();
stream->sendServerInitialMetadata(empty_metadata_);
stream->sendReply();
stream->sendServerTrailers(Status::WellKnownGrpcStatus::Ok, "", empty_metadata_);
dispatcher_helper_.runDispatcher();

if (clientType() == ClientType::EnvoyGrpc) { // Envoy gRPC based AsyncGrpcClient should reset
// stream, since server half-closed before client.
EXPECT_EQ(
cm_.thread_local_cluster_.cluster_.info_->trafficStats()->upstream_rq_tx_reset_.value(), 1);
stream->waitForReset();
}
}

// Validate that a simple request-reply stream works.
TEST_P(GrpcClientIntegrationTest, BasicStreamGracefulClose) {
initialize();
auto stream = createStream(empty_metadata_);
// Send request with end_stream set to true, causing gRPC client to half close
// the stream.
RequestArgs request_args{nullptr, true};
stream->sendRequest(request_args);
stream->sendServerInitialMetadata(empty_metadata_);
stream->sendReply();
stream->sendServerTrailers(Status::WellKnownGrpcStatus::Ok, "", empty_metadata_);
dispatcher_helper_.runDispatcher();

// AsyncGrpcClient should not cause local reset, completing the stream gracefully.
EXPECT_EQ(cm_.thread_local_cluster_.cluster_.info_->trafficStats()->upstream_rq_tx_reset_.value(),
0);
stream->waitForEndStream();
}

// A simple request-reply stream, "x-envoy-internal" and `x-forward-for` headers
Expand Down
9 changes: 9 additions & 0 deletions test/common/grpc/grpc_client_integration_test_harness.h
Original file line number Diff line number Diff line change
Expand Up @@ -234,10 +234,19 @@ class HelloworldStream : public MockAsyncStreamCallbacks<helloworld::HelloReply>

void closeStream() {
grpc_stream_->closeStream();
waitForEndStream();
}

void waitForEndStream() {
AssertionResult result = fake_stream_->waitForEndStream(dispatcher_helper_.dispatcher_);
RELEASE_ASSERT(result, result.message());
}

void waitForReset() {
AssertionResult result = fake_stream_->waitForReset(dispatcher_helper_.dispatcher_);
RELEASE_ASSERT(result, result.message());
}

DispatcherHelper& dispatcher_helper_;
FakeStream* fake_stream_{};
AsyncStream<helloworld::HelloRequest> grpc_stream_{};
Expand Down
44 changes: 44 additions & 0 deletions test/extensions/filters/http/ext_proc/ext_proc_integration_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1088,6 +1088,45 @@ TEST_P(ExtProcIntegrationTest, GetAndFailStreamOnResponse) {
verifyDownstreamResponse(*response, 500);
}

TEST_P(ExtProcIntegrationTest, OnlyRequestHeaders) {
// Skip the header processing on response path.
proto_config_.mutable_processing_mode()->set_response_header_mode(ProcessingMode::SKIP);
initializeConfig();
HttpIntegrationTest::initialize();
auto response = sendDownstreamRequest(absl::nullopt);

processRequestHeadersMessage(
*grpc_upstreams_[0], true, [](const HttpHeaders&, HeadersResponse& headers_resp) {
// The response does not really matter, it just needs to be non-empty.
auto response_header_mutation = headers_resp.mutable_response()->mutable_header_mutation();
auto* mut1 = response_header_mutation->add_set_headers();
mut1->mutable_header()->set_key("x-new-header");
mut1->mutable_header()->set_raw_value("new");
return true;
});
// ext_proc is configured to only send request headers. In this case, server indicates that it is
// not expecting any more messages from ext_proc filter and half-closes the stream.
processor_stream_->finishGrpcStream(Grpc::Status::Ok);

// ext_proc will immediately close side stream in this case, because by default Envoy gRPC client
// will reset the stream if the server half-closes before the client. Note that the ext_proc
// filter has not yet half-closed the sidestream, since it is doing it during its destruction.
// TODO(yanavlasov): Enable independent half-close for Envoy gRPC client and remove this check.
// TODO(yanavlasov): Reset in Google gRPC case is unexpected. Diagnose and fix.
EXPECT_TRUE(processor_stream_->waitForReset());

ASSERT_TRUE(fake_upstreams_[0]->waitForHttpConnection(*dispatcher_, fake_upstream_connection_));
ASSERT_TRUE(fake_upstream_connection_->waitForNewStream(*dispatcher_, upstream_request_));
ASSERT_TRUE(upstream_request_->waitForEndStream(*dispatcher_));

EXPECT_THAT(upstream_request_->headers(), SingleHeaderValueIs("x-new-header", "new"));

upstream_request_->encodeHeaders(Http::TestResponseHeaderMapImpl{{":status", "200"}}, false);
upstream_request_->encodeData(100, true);

verifyDownstreamResponse(*response, 200);
}

// Test the filter using the default configuration by connecting to
// an ext_proc server that responds to the request_headers message
// by requesting to modify the request headers.
Expand Down Expand Up @@ -2106,6 +2145,11 @@ TEST_P(ExtProcIntegrationTest, GetAndRespondImmediately) {
hdr2->mutable_header()->set_raw_value("application/json");
});

// ext_proc will immediately close side stream in this case, which causes it to be reset,
// since side stream codec had not yet observed server trailers.
// TODO(yanavlasov): Separate lifetimes of ext_proc and sidestream.
EXPECT_TRUE(processor_stream_->waitForReset());

verifyDownstreamResponse(*response, 401);
EXPECT_THAT(response->headers(), SingleHeaderValueIs("x-failure-reason", "testing"));
EXPECT_THAT(response->headers(), SingleHeaderValueIs("content-type", "application/json"));
Expand Down
11 changes: 11 additions & 0 deletions test/integration/fake_upstream.cc
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,17 @@ AssertionResult FakeStream::waitForReset(milliseconds timeout) {
return AssertionSuccess();
}

AssertionResult FakeStream::waitForReset(Event::Dispatcher& client_dispatcher,
std::chrono::milliseconds timeout) {
absl::MutexLock lock(&lock_);
if (!waitForWithDispatcherRun(
time_system_, lock_, [this]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(lock_) { return saw_reset_; },
client_dispatcher, timeout)) {
return AssertionFailure() << "Timed out waiting for reset of stream.";
}
return AssertionSuccess();
}

void FakeStream::startGrpcStream(bool send_headers) {
ASSERT(!grpc_stream_started_, "gRPC stream should not be started more than once");
grpc_stream_started_ = true;
Expand Down
5 changes: 5 additions & 0 deletions test/integration/fake_upstream.h
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,11 @@ class FakeStream : public Http::RequestDecoder,
testing::AssertionResult
waitForReset(std::chrono::milliseconds timeout = TestUtility::DefaultTimeout);

ABSL_MUST_USE_RESULT
testing::AssertionResult
waitForReset(Event::Dispatcher& client_dispatcher,
std::chrono::milliseconds timeout = TestUtility::DefaultTimeout);

// gRPC convenience methods.
void startGrpcStream(bool send_headers = true);
void finishGrpcStream(Grpc::Status::GrpcStatus status);
Expand Down

0 comments on commit 79f28bf

Please sign in to comment.