diff --git a/source/common/router/router.cc b/source/common/router/router.cc index 3719b887a061..b31792ba2428 100644 --- a/source/common/router/router.cc +++ b/source/common/router/router.cc @@ -541,23 +541,26 @@ Http::FilterHeadersStatus Filter::decodeHeaders(Http::RequestHeaderMap& headers, } } - auto conn_pool = getConnPool(); - - if (!conn_pool) { + Http::ConnectionPool::Instance* http_pool = getHttpConnPool(); + Upstream::HostDescriptionConstSharedPtr host; + if (http_pool) { + host = http_pool->host(); + } else { sendNoHealthyUpstreamResponse(); return Http::FilterHeadersStatus::StopIteration; } + if (debug_config && debug_config->append_upstream_host_) { // The hostname and address will be appended to any local or upstream responses from this point, // possibly in addition to the cluster name. - modify_headers = [modify_headers, debug_config, conn_pool](Http::ResponseHeaderMap& headers) { + modify_headers = [modify_headers, debug_config, host](Http::ResponseHeaderMap& headers) { modify_headers(headers); headers.addCopy( debug_config->hostname_header_.value_or(Http::Headers::get().EnvoyUpstreamHostname), - conn_pool->host()->hostname()); + host->hostname()); headers.addCopy(debug_config->host_address_header_.value_or( Http::Headers::get().EnvoyUpstreamHostAddress), - conn_pool->host()->address()->asString()); + host->address()->asString()); }; } @@ -608,8 +611,8 @@ Http::FilterHeadersStatus Filter::decodeHeaders(Http::RequestHeaderMap& headers, route_entry_->finalizeRequestHeaders(headers, callbacks_->streamInfo(), !config_.suppress_envoy_headers_); - FilterUtility::setUpstreamScheme( - headers, conn_pool->host()->transportSocketFactory().implementsSecureTransport()); + FilterUtility::setUpstreamScheme(headers, + host->transportSocketFactory().implementsSecureTransport()); // Ensure an http transport scheme is selected before continuing with decoding. ASSERT(headers.Scheme()); @@ -632,7 +635,8 @@ Http::FilterHeadersStatus Filter::decodeHeaders(Http::RequestHeaderMap& headers, // Hang onto the modify_headers function for later use in handling upstream responses. modify_headers_ = modify_headers; - UpstreamRequestPtr upstream_request = std::make_unique(*this, *conn_pool); + UpstreamRequestPtr upstream_request = + std::make_unique(*this, std::make_unique(*http_pool)); upstream_request->moveIntoList(std::move(upstream_request), upstream_requests_); upstream_requests_.front()->encodeHeaders(end_stream); if (end_stream) { @@ -642,7 +646,7 @@ Http::FilterHeadersStatus Filter::decodeHeaders(Http::RequestHeaderMap& headers, return Http::FilterHeadersStatus::StopIteration; } -Http::ConnectionPool::Instance* Filter::getConnPool() { +Http::ConnectionPool::Instance* Filter::getHttpConnPool() { // Choose protocol based on cluster configuration and downstream connection // Note: Cluster may downgrade HTTP2 to HTTP1 based on runtime configuration. Http::Protocol protocol = cluster_->upstreamHttpProtocol(callbacks_->streamInfo().protocol()); @@ -1425,8 +1429,15 @@ void Filter::doRetry() { attempt_count_++; ASSERT(pending_retries_ > 0); pending_retries_--; - Http::ConnectionPool::Instance* conn_pool = getConnPool(); - if (!conn_pool) { + UpstreamRequestPtr upstream_request; + + Http::ConnectionPool::Instance* conn_pool = getHttpConnPool(); + if (conn_pool) { + upstream_request = + std::make_unique(*this, std::make_unique(*conn_pool)); + } + + if (!upstream_request) { sendNoHealthyUpstreamResponse(); cleanup(); return; @@ -1437,7 +1448,6 @@ void Filter::doRetry() { } ASSERT(response_timeout_ || timeout_.global_timeout_.count() == 0); - UpstreamRequestPtr upstream_request = std::make_unique(*this, *conn_pool); UpstreamRequest* upstream_request_tmp = upstream_request.get(); upstream_request->moveIntoList(std::move(upstream_request), upstream_requests_); upstream_requests_.front()->encodeHeaders(!callbacks_->decodingBuffer() && !downstream_trailers_); diff --git a/source/common/router/router.h b/source/common/router/router.h index e8d5600a24ac..de525aaa6701 100644 --- a/source/common/router/router.h +++ b/source/common/router/router.h @@ -398,7 +398,7 @@ class Filter : Logger::Loggable, const Upstream::ClusterInfo& cluster, const VirtualCluster* vcluster, Runtime::Loader& runtime, Runtime::RandomGenerator& random, Event::Dispatcher& dispatcher, Upstream::ResourcePriority priority) PURE; - Http::ConnectionPool::Instance* getConnPool(); + Http::ConnectionPool::Instance* getHttpConnPool(); void maybeDoShadowing(); bool maybeRetryReset(Http::StreamResetReason reset_reason, UpstreamRequest& upstream_request); uint32_t numRequestsAwaitingHeaders(); diff --git a/source/common/router/upstream_request.cc b/source/common/router/upstream_request.cc index 3fcdea82d5a5..4ebc72894966 100644 --- a/source/common/router/upstream_request.cc +++ b/source/common/router/upstream_request.cc @@ -41,9 +41,9 @@ namespace Envoy { namespace Router { -UpstreamRequest::UpstreamRequest(Filter& parent, Http::ConnectionPool::Instance& pool) - : parent_(parent), conn_pool_(pool), grpc_rq_success_deferred_(false), - stream_info_(pool.protocol(), parent_.callbacks_->dispatcher().timeSource()), +UpstreamRequest::UpstreamRequest(Filter& parent, std::unique_ptr&& conn_pool) + : parent_(parent), conn_pool_(std::move(conn_pool)), grpc_rq_success_deferred_(false), + stream_info_(parent_.callbacks_->dispatcher().timeSource()), start_time_(parent_.callbacks_->dispatcher().timeSource().monotonicTime()), calling_encode_headers_(false), upstream_canary_(false), decode_complete_(false), encode_complete_(false), encode_trailers_(false), retried_(false), awaiting_headers_(true), @@ -61,6 +61,9 @@ UpstreamRequest::UpstreamRequest(Filter& parent, Http::ConnectionPool::Instance& } stream_info_.healthCheck(parent_.callbacks_->streamInfo().healthCheck()); + if (conn_pool_->protocol().has_value()) { + stream_info_.protocol(conn_pool_->protocol().value()); + } } UpstreamRequest::~UpstreamRequest() { @@ -162,13 +165,7 @@ void UpstreamRequest::encodeHeaders(bool end_stream) { ASSERT(!encode_complete_); encode_complete_ = end_stream; - // It's possible for a reset to happen inline within the newStream() call. In this case, we - // might get deleted inline as well. Only write the returned handle out if it is not nullptr to - // deal with this case. - Http::ConnectionPool::Cancellable* handle = conn_pool_.newStream(*this, *this); - if (handle) { - conn_pool_stream_handle_ = handle; - } + conn_pool_->newStream(this); } void UpstreamRequest::encodeData(Buffer::Instance& data, bool end_stream) { @@ -257,11 +254,9 @@ void UpstreamRequest::resetStream() { span_->setTag(Tracing::Tags::get().Canceled, Tracing::Tags::get().True); } - if (conn_pool_stream_handle_) { - ENVOY_STREAM_LOG(debug, "cancelling pool request", *parent_.callbacks_); + if (conn_pool_->cancelAnyPendingRequest()) { + ENVOY_STREAM_LOG(debug, "canceled pool request", *parent_.callbacks_); ASSERT(!upstream_); - conn_pool_stream_handle_->cancel(); - conn_pool_stream_handle_ = nullptr; } if (upstream_) { @@ -313,12 +308,15 @@ void UpstreamRequest::onPoolFailure(Http::ConnectionPool::PoolFailureReason reas onResetStream(reset_reason, transport_failure_reason); } -void UpstreamRequest::onPoolReady(Http::RequestEncoder& request_encoder, - Upstream::HostDescriptionConstSharedPtr host, - const StreamInfo::StreamInfo& info) { +void UpstreamRequest::onPoolReady( + std::unique_ptr&& upstream, Upstream::HostDescriptionConstSharedPtr host, + const Network::Address::InstanceConstSharedPtr& upstream_local_address, + const StreamInfo::StreamInfo& info) { // This may be called under an existing ScopeTrackerScopeState but it will unwind correctly. ScopeTrackerScopeState scope(&parent_.callbacks_->scope(), parent_.callbacks_->dispatcher()); ENVOY_STREAM_LOG(debug, "pool ready", *parent_.callbacks_); + upstream_ = std::move(upstream); + if (parent_.request_vcluster_) { // The cluster increases its upstream_rq_total_ counter right before firing this onPoolReady // callback. Hence, the upstream request increases the virtual cluster's upstream_rq_total_ stat @@ -330,9 +328,8 @@ void UpstreamRequest::onPoolReady(Http::RequestEncoder& request_encoder, onUpstreamHostSelected(host); - stream_info_.setUpstreamLocalAddress(request_encoder.getStream().connectionLocalAddress()); - parent_.callbacks_->streamInfo().setUpstreamLocalAddress( - request_encoder.getStream().connectionLocalAddress()); + stream_info_.setUpstreamLocalAddress(upstream_local_address); + parent_.callbacks_->streamInfo().setUpstreamLocalAddress(upstream_local_address); stream_info_.setUpstreamSslConnection(info.downstreamSslConnection()); parent_.callbacks_->streamInfo().setUpstreamSslConnection(info.downstreamSslConnection()); @@ -343,8 +340,11 @@ void UpstreamRequest::onPoolReady(Http::RequestEncoder& request_encoder, create_per_try_timeout_on_request_complete_ = true; } - conn_pool_stream_handle_ = nullptr; - setRequestEncoder(request_encoder); + // Make sure the connection manager will inform the downstream watermark manager when the + // downstream buffers are overrun. This may result in immediate watermark callbacks referencing + // the encoder. + parent_.callbacks_->addDownstreamWatermarkCallbacks(downstream_watermark_manager_); + calling_encode_headers_ = true; if (parent_.route_entry_->autoHostRewrite() && !host->hostname().empty()) { parent_.downstream_headers_->setHost(host->hostname()); @@ -360,8 +360,7 @@ void UpstreamRequest::onPoolReady(Http::RequestEncoder& request_encoder, // If end_stream is set in headers, and there are metadata to send, delays end_stream. The case // only happens when decoding headers filters return ContinueAndEndStream. const bool delay_headers_end_stream = end_stream && !downstream_metadata_map_vector_.empty(); - request_encoder.encodeHeaders(*parent_.downstream_headers_, - end_stream && !delay_headers_end_stream); + upstream_->encodeHeaders(*parent_.downstream_headers_, end_stream && !delay_headers_end_stream); calling_encode_headers_ = false; // It is possible to get reset in the middle of an encodeHeaders() call. This happens for @@ -376,21 +375,21 @@ void UpstreamRequest::onPoolReady(Http::RequestEncoder& request_encoder, if (!downstream_metadata_map_vector_.empty()) { ENVOY_STREAM_LOG(debug, "Send metadata onPoolReady. {}", *parent_.callbacks_, downstream_metadata_map_vector_); - request_encoder.encodeMetadata(downstream_metadata_map_vector_); + upstream_->encodeMetadata(downstream_metadata_map_vector_); downstream_metadata_map_vector_.clear(); if (delay_headers_end_stream) { Buffer::OwnedImpl empty_data(""); - request_encoder.encodeData(empty_data, true); + upstream_->encodeData(empty_data, true); } } if (buffered_request_body_) { stream_info_.addBytesSent(buffered_request_body_->length()); - request_encoder.encodeData(*buffered_request_body_, encode_complete_ && !encode_trailers_); + upstream_->encodeData(*buffered_request_body_, encode_complete_ && !encode_trailers_); } if (encode_trailers_) { - request_encoder.encodeTrailers(*parent_.downstream_trailers_); + upstream_->encodeTrailers(*parent_.downstream_trailers_); } if (encode_complete_) { @@ -399,14 +398,6 @@ void UpstreamRequest::onPoolReady(Http::RequestEncoder& request_encoder, } } -void UpstreamRequest::setRequestEncoder(Http::RequestEncoder& request_encoder) { - upstream_.reset(new HttpUpstream(*this, &request_encoder)); - // Now that there is an encoder, have the connection manager inform the manager when the - // downstream buffers are overrun. This may result in immediate watermark callbacks referencing - // the encoder. - parent_.callbacks_->addDownstreamWatermarkCallbacks(downstream_watermark_manager_); -} - void UpstreamRequest::clearRequestEncoder() { // Before clearing the encoder, unsubscribe from callbacks. if (upstream_) { @@ -472,5 +463,41 @@ void UpstreamRequest::enableDataFromDownstreamForFlowControl() { parent_.callbacks_->onDecoderFilterBelowWriteBufferLowWatermark(); } +void HttpConnPool::newStream(UpstreamRequest* request) { + request_ = request; + // It's possible for a reset to happen inline within the newStream() call. In this case, we + // might get deleted inline as well. Only write the returned handle out if it is not nullptr to + // deal with this case. + Http::ConnectionPool::Cancellable* handle = conn_pool_.newStream(*request, *this); + if (handle) { + conn_pool_stream_handle_ = handle; + } +} + +bool HttpConnPool::cancelAnyPendingRequest() { + if (conn_pool_stream_handle_) { + conn_pool_stream_handle_->cancel(); + conn_pool_stream_handle_ = nullptr; + return true; + } + return false; +} +absl::optional HttpConnPool::protocol() const { return conn_pool_.protocol(); } + +void HttpConnPool::onPoolFailure(Http::ConnectionPool::PoolFailureReason reason, + absl::string_view transport_failure_reason, + Upstream::HostDescriptionConstSharedPtr host) { + request_->onPoolFailure(reason, transport_failure_reason, host); +} + +void HttpConnPool::onPoolReady(Http::RequestEncoder& request_encoder, + Upstream::HostDescriptionConstSharedPtr host, + const StreamInfo::StreamInfo& info) { + conn_pool_stream_handle_ = nullptr; + auto upstream = std::make_unique(*request_, &request_encoder); + request_->onPoolReady(std::move(upstream), host, + request_encoder.getStream().connectionLocalAddress(), info); +} + } // namespace Router } // namespace Envoy diff --git a/source/common/router/upstream_request.h b/source/common/router/upstream_request.h index 191da261fb96..6615555cc987 100644 --- a/source/common/router/upstream_request.h +++ b/source/common/router/upstream_request.h @@ -27,14 +27,29 @@ namespace Router { class Filter; class GenericUpstream; +class UpstreamRequest; + +// An API for wrapping either an HTTP or a TCP connection pool. +class GenericConnPool : public Logger::Loggable { +public: + virtual ~GenericConnPool() = default; + // Called to create a new HTTP stream or TCP connection. The implementation + // is then responsible for calling either onPoolReady or onPoolFailure on the + // supplied UpstreamRequest. + virtual void newStream(UpstreamRequest* request) PURE; + // Called to cancel a call to newStream. Returns true if a newStream request + // was canceled, false otherwise. + virtual bool cancelAnyPendingRequest() PURE; + // Optionally returns the protocol for the connection pool. + virtual absl::optional protocol() const PURE; +}; // The base request for Upstream. class UpstreamRequest : public Logger::Loggable, public Http::ResponseDecoder, - public Http::ConnectionPool::Callbacks, public LinkedObject { public: - UpstreamRequest(Filter& parent, Http::ConnectionPool::Instance& pool); + UpstreamRequest(Filter& parent, std::unique_ptr&& conn_pool); ~UpstreamRequest() override; void encodeHeaders(bool end_stream); @@ -62,15 +77,14 @@ class UpstreamRequest : public Logger::Loggable, void disableDataFromDownstreamForFlowControl(); void enableDataFromDownstreamForFlowControl(); - // Http::ConnectionPool::Callbacks void onPoolFailure(Http::ConnectionPool::PoolFailureReason reason, absl::string_view transport_failure_reason, - Upstream::HostDescriptionConstSharedPtr host) override; - void onPoolReady(Http::RequestEncoder& request_encoder, + Upstream::HostDescriptionConstSharedPtr host); + void onPoolReady(std::unique_ptr&& upstream, Upstream::HostDescriptionConstSharedPtr host, - const StreamInfo::StreamInfo& info) override; + const Network::Address::InstanceConstSharedPtr& upstream_local_address, + const StreamInfo::StreamInfo& info); - void setRequestEncoder(Http::RequestEncoder& request_encoder); void clearRequestEncoder(); struct DownstreamWatermarkManager : public Http::DownstreamWatermarkCallbacks { @@ -106,10 +120,9 @@ class UpstreamRequest : public Logger::Loggable, private: Filter& parent_; - Http::ConnectionPool::Instance& conn_pool_; + std::unique_ptr conn_pool_; bool grpc_rq_success_deferred_; Event::TimerPtr per_try_timeout_; - Http::ConnectionPool::Cancellable* conn_pool_stream_handle_{}; std::unique_ptr upstream_; absl::optional deferred_reset_reason_; Buffer::WatermarkBufferPtr buffered_request_body_; @@ -142,6 +155,30 @@ class UpstreamRequest : public Logger::Loggable, bool record_timeout_budget_ : 1; }; +class HttpConnPool : public GenericConnPool, public Http::ConnectionPool::Callbacks { +public: + HttpConnPool(Http::ConnectionPool::Instance& conn_pool) : conn_pool_(conn_pool) {} + + // GenericConnPool + void newStream(UpstreamRequest* request) override; + bool cancelAnyPendingRequest() override; + absl::optional protocol() const override; + + // Http::ConnectionPool::Callbacks + void onPoolFailure(Http::ConnectionPool::PoolFailureReason reason, + absl::string_view transport_failure_reason, + Upstream::HostDescriptionConstSharedPtr host) override; + void onPoolReady(Http::RequestEncoder& request_encoder, + Upstream::HostDescriptionConstSharedPtr host, + const StreamInfo::StreamInfo& info) override; + +private: + // Points to the actual connection pool to create streams from. + Http::ConnectionPool::Instance& conn_pool_; + Http::ConnectionPool::Cancellable* conn_pool_stream_handle_{}; + UpstreamRequest* request_{}; +}; + // A generic API which covers common functionality between HTTP and TCP upstreams. class GenericUpstream { public: