diff --git a/source/common/http/codec_client.cc b/source/common/http/codec_client.cc index 77fb57a814ea..4e437da1588c 100644 --- a/source/common/http/codec_client.cc +++ b/source/common/http/codec_client.cc @@ -156,7 +156,7 @@ void CodecClient::completeRequest(ActiveRequest& request) { } void CodecClient::onReset(ActiveRequest& request, StreamResetReason reason) { - ENVOY_CONN_LOG(debug, "request reset", *connection_); + ENVOY_CONN_LOG(debug, "Request reset. Reason {}", *connection_, static_cast(reason)); if (codec_client_callbacks_) { codec_client_callbacks_->onStreamReset(reason); } diff --git a/test/common/grpc/grpc_client_integration_test.cc b/test/common/grpc/grpc_client_integration_test.cc index 4e86f295a257..d870b8b1d246 100644 --- a/test/common/grpc/grpc_client_integration_test.cc +++ b/test/common/grpc/grpc_client_integration_test.cc @@ -88,11 +88,41 @@ INSTANTIATE_TEST_SUITE_P(IpVersionsClientType, GrpcClientIntegrationTest, TEST_P(GrpcClientIntegrationTest, BasicStream) { initialize(); auto stream = createStream(empty_metadata_); + // Send request without END_STREAM set to true. By default Envoy gRPC client will reset + // the stream upon receiving response with trailers. Google gRPC client will not reset the + // stream as it by default supports independent half-close. auto stream = + // createStream(empty_metadata_); stream->sendRequest(); stream->sendServerInitialMetadata(empty_metadata_); stream->sendReply(); stream->sendServerTrailers(Status::WellKnownGrpcStatus::Ok, "", empty_metadata_); dispatcher_helper_.runDispatcher(); + + if (clientType() == ClientType::EnvoyGrpc) { // Envoy gRPC based AsyncGrpcClient should reset + // stream, since server half-closed before client. + EXPECT_EQ( + cm_.thread_local_cluster_.cluster_.info_->trafficStats()->upstream_rq_tx_reset_.value(), 1); + stream->waitForReset(); + } +} + +// Validate that a simple request-reply stream works. +TEST_P(GrpcClientIntegrationTest, BasicStreamGracefulClose) { + initialize(); + auto stream = createStream(empty_metadata_); + // Send request with end_stream set to true, causing gRPC client to half close + // the stream. + RequestArgs request_args{nullptr, true}; + stream->sendRequest(request_args); + stream->sendServerInitialMetadata(empty_metadata_); + stream->sendReply(); + stream->sendServerTrailers(Status::WellKnownGrpcStatus::Ok, "", empty_metadata_); + dispatcher_helper_.runDispatcher(); + + // AsyncGrpcClient should not cause local reset, completing the stream gracefully. + EXPECT_EQ(cm_.thread_local_cluster_.cluster_.info_->trafficStats()->upstream_rq_tx_reset_.value(), + 0); + stream->waitForEndStream(); } // A simple request-reply stream, "x-envoy-internal" and `x-forward-for` headers diff --git a/test/common/grpc/grpc_client_integration_test_harness.h b/test/common/grpc/grpc_client_integration_test_harness.h index 10beadbcab12..4a79ddde427d 100644 --- a/test/common/grpc/grpc_client_integration_test_harness.h +++ b/test/common/grpc/grpc_client_integration_test_harness.h @@ -234,10 +234,19 @@ class HelloworldStream : public MockAsyncStreamCallbacks void closeStream() { grpc_stream_->closeStream(); + waitForEndStream(); + } + + void waitForEndStream() { AssertionResult result = fake_stream_->waitForEndStream(dispatcher_helper_.dispatcher_); RELEASE_ASSERT(result, result.message()); } + void waitForReset() { + AssertionResult result = fake_stream_->waitForReset(dispatcher_helper_.dispatcher_); + RELEASE_ASSERT(result, result.message()); + } + DispatcherHelper& dispatcher_helper_; FakeStream* fake_stream_{}; AsyncStream grpc_stream_{}; diff --git a/test/extensions/filters/http/ext_proc/ext_proc_integration_test.cc b/test/extensions/filters/http/ext_proc/ext_proc_integration_test.cc index 40b0f8741139..21fc40f8bf46 100644 --- a/test/extensions/filters/http/ext_proc/ext_proc_integration_test.cc +++ b/test/extensions/filters/http/ext_proc/ext_proc_integration_test.cc @@ -1088,6 +1088,45 @@ TEST_P(ExtProcIntegrationTest, GetAndFailStreamOnResponse) { verifyDownstreamResponse(*response, 500); } +TEST_P(ExtProcIntegrationTest, OnlyRequestHeaders) { + // Skip the header processing on response path. + proto_config_.mutable_processing_mode()->set_response_header_mode(ProcessingMode::SKIP); + initializeConfig(); + HttpIntegrationTest::initialize(); + auto response = sendDownstreamRequest(absl::nullopt); + + processRequestHeadersMessage( + *grpc_upstreams_[0], true, [](const HttpHeaders&, HeadersResponse& headers_resp) { + // The response does not really matter, it just needs to be non-empty. + auto response_header_mutation = headers_resp.mutable_response()->mutable_header_mutation(); + auto* mut1 = response_header_mutation->add_set_headers(); + mut1->mutable_header()->set_key("x-new-header"); + mut1->mutable_header()->set_raw_value("new"); + return true; + }); + // ext_proc is configured to only send request headers. In this case, server indicates that it is + // not expecting any more messages from ext_proc filter and half-closes the stream. + processor_stream_->finishGrpcStream(Grpc::Status::Ok); + + // ext_proc will immediately close side stream in this case, because by default Envoy gRPC client + // will reset the stream if the server half-closes before the client. Note that the ext_proc + // filter has not yet half-closed the sidestream, since it is doing it during its destruction. + // TODO(yanavlasov): Enable independent half-close for Envoy gRPC client and remove this check. + // TODO(yanavlasov): Reset in Google gRPC case is unexpected. Diagnose and fix. + EXPECT_TRUE(processor_stream_->waitForReset()); + + ASSERT_TRUE(fake_upstreams_[0]->waitForHttpConnection(*dispatcher_, fake_upstream_connection_)); + ASSERT_TRUE(fake_upstream_connection_->waitForNewStream(*dispatcher_, upstream_request_)); + ASSERT_TRUE(upstream_request_->waitForEndStream(*dispatcher_)); + + EXPECT_THAT(upstream_request_->headers(), SingleHeaderValueIs("x-new-header", "new")); + + upstream_request_->encodeHeaders(Http::TestResponseHeaderMapImpl{{":status", "200"}}, false); + upstream_request_->encodeData(100, true); + + verifyDownstreamResponse(*response, 200); +} + // Test the filter using the default configuration by connecting to // an ext_proc server that responds to the request_headers message // by requesting to modify the request headers. @@ -2106,6 +2145,11 @@ TEST_P(ExtProcIntegrationTest, GetAndRespondImmediately) { hdr2->mutable_header()->set_raw_value("application/json"); }); + // ext_proc will immediately close side stream in this case, which causes it to be reset, + // since side stream codec had not yet observed server trailers. + // TODO(yanavlasov): Separate lifetimes of ext_proc and sidestream. + EXPECT_TRUE(processor_stream_->waitForReset()); + verifyDownstreamResponse(*response, 401); EXPECT_THAT(response->headers(), SingleHeaderValueIs("x-failure-reason", "testing")); EXPECT_THAT(response->headers(), SingleHeaderValueIs("content-type", "application/json")); diff --git a/test/integration/fake_upstream.cc b/test/integration/fake_upstream.cc index e03913a024da..e43bdf03f552 100644 --- a/test/integration/fake_upstream.cc +++ b/test/integration/fake_upstream.cc @@ -324,6 +324,17 @@ AssertionResult FakeStream::waitForReset(milliseconds timeout) { return AssertionSuccess(); } +AssertionResult FakeStream::waitForReset(Event::Dispatcher& client_dispatcher, + std::chrono::milliseconds timeout) { + absl::MutexLock lock(&lock_); + if (!waitForWithDispatcherRun( + time_system_, lock_, [this]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(lock_) { return saw_reset_; }, + client_dispatcher, timeout)) { + return AssertionFailure() << "Timed out waiting for reset of stream."; + } + return AssertionSuccess(); +} + void FakeStream::startGrpcStream(bool send_headers) { ASSERT(!grpc_stream_started_, "gRPC stream should not be started more than once"); grpc_stream_started_ = true; diff --git a/test/integration/fake_upstream.h b/test/integration/fake_upstream.h index dc591b36ab93..39c0d76db46a 100644 --- a/test/integration/fake_upstream.h +++ b/test/integration/fake_upstream.h @@ -165,6 +165,11 @@ class FakeStream : public Http::RequestDecoder, testing::AssertionResult waitForReset(std::chrono::milliseconds timeout = TestUtility::DefaultTimeout); + ABSL_MUST_USE_RESULT + testing::AssertionResult + waitForReset(Event::Dispatcher& client_dispatcher, + std::chrono::milliseconds timeout = TestUtility::DefaultTimeout); + // gRPC convenience methods. void startGrpcStream(bool send_headers = true); void finishGrpcStream(Grpc::Status::GrpcStatus status);