Skip to content

Commit

Permalink
router: refactoring into neutral APIs (envoyproxy#10503)
Browse files Browse the repository at this point in the history
Trying to minimize diffs for upcoming PR adding a TCP upstream.

Risk Level: Medium (router refactor)
Testing: n/a
Docs Changes: n/a
Release Notes: n/a
Part of envoyproxy#1630

Signed-off-by: Alyssa Wilk <alyssar@chromium.org>
  • Loading branch information
alyssawilk authored Mar 26, 2020
1 parent f23165b commit 7fdf056
Show file tree
Hide file tree
Showing 4 changed files with 133 additions and 59 deletions.
36 changes: 23 additions & 13 deletions source/common/router/router.cc
Original file line number Diff line number Diff line change
Expand Up @@ -541,23 +541,26 @@ Http::FilterHeadersStatus Filter::decodeHeaders(Http::RequestHeaderMap& headers,
}
}

auto conn_pool = getConnPool();

if (!conn_pool) {
Http::ConnectionPool::Instance* http_pool = getHttpConnPool();
Upstream::HostDescriptionConstSharedPtr host;
if (http_pool) {
host = http_pool->host();
} else {
sendNoHealthyUpstreamResponse();
return Http::FilterHeadersStatus::StopIteration;
}

if (debug_config && debug_config->append_upstream_host_) {
// The hostname and address will be appended to any local or upstream responses from this point,
// possibly in addition to the cluster name.
modify_headers = [modify_headers, debug_config, conn_pool](Http::ResponseHeaderMap& headers) {
modify_headers = [modify_headers, debug_config, host](Http::ResponseHeaderMap& headers) {
modify_headers(headers);
headers.addCopy(
debug_config->hostname_header_.value_or(Http::Headers::get().EnvoyUpstreamHostname),
conn_pool->host()->hostname());
host->hostname());
headers.addCopy(debug_config->host_address_header_.value_or(
Http::Headers::get().EnvoyUpstreamHostAddress),
conn_pool->host()->address()->asString());
host->address()->asString());
};
}

Expand Down Expand Up @@ -608,8 +611,8 @@ Http::FilterHeadersStatus Filter::decodeHeaders(Http::RequestHeaderMap& headers,

route_entry_->finalizeRequestHeaders(headers, callbacks_->streamInfo(),
!config_.suppress_envoy_headers_);
FilterUtility::setUpstreamScheme(
headers, conn_pool->host()->transportSocketFactory().implementsSecureTransport());
FilterUtility::setUpstreamScheme(headers,
host->transportSocketFactory().implementsSecureTransport());

// Ensure an http transport scheme is selected before continuing with decoding.
ASSERT(headers.Scheme());
Expand All @@ -632,7 +635,8 @@ Http::FilterHeadersStatus Filter::decodeHeaders(Http::RequestHeaderMap& headers,
// Hang onto the modify_headers function for later use in handling upstream responses.
modify_headers_ = modify_headers;

UpstreamRequestPtr upstream_request = std::make_unique<UpstreamRequest>(*this, *conn_pool);
UpstreamRequestPtr upstream_request =
std::make_unique<UpstreamRequest>(*this, std::make_unique<HttpConnPool>(*http_pool));
upstream_request->moveIntoList(std::move(upstream_request), upstream_requests_);
upstream_requests_.front()->encodeHeaders(end_stream);
if (end_stream) {
Expand All @@ -642,7 +646,7 @@ Http::FilterHeadersStatus Filter::decodeHeaders(Http::RequestHeaderMap& headers,
return Http::FilterHeadersStatus::StopIteration;
}

Http::ConnectionPool::Instance* Filter::getConnPool() {
Http::ConnectionPool::Instance* Filter::getHttpConnPool() {
// Choose protocol based on cluster configuration and downstream connection
// Note: Cluster may downgrade HTTP2 to HTTP1 based on runtime configuration.
Http::Protocol protocol = cluster_->upstreamHttpProtocol(callbacks_->streamInfo().protocol());
Expand Down Expand Up @@ -1425,8 +1429,15 @@ void Filter::doRetry() {
attempt_count_++;
ASSERT(pending_retries_ > 0);
pending_retries_--;
Http::ConnectionPool::Instance* conn_pool = getConnPool();
if (!conn_pool) {
UpstreamRequestPtr upstream_request;

Http::ConnectionPool::Instance* conn_pool = getHttpConnPool();
if (conn_pool) {
upstream_request =
std::make_unique<UpstreamRequest>(*this, std::make_unique<HttpConnPool>(*conn_pool));
}

if (!upstream_request) {
sendNoHealthyUpstreamResponse();
cleanup();
return;
Expand All @@ -1437,7 +1448,6 @@ void Filter::doRetry() {
}

ASSERT(response_timeout_ || timeout_.global_timeout_.count() == 0);
UpstreamRequestPtr upstream_request = std::make_unique<UpstreamRequest>(*this, *conn_pool);
UpstreamRequest* upstream_request_tmp = upstream_request.get();
upstream_request->moveIntoList(std::move(upstream_request), upstream_requests_);
upstream_requests_.front()->encodeHeaders(!callbacks_->decodingBuffer() && !downstream_trailers_);
Expand Down
2 changes: 1 addition & 1 deletion source/common/router/router.h
Original file line number Diff line number Diff line change
Expand Up @@ -398,7 +398,7 @@ class Filter : Logger::Loggable<Logger::Id::router>,
const Upstream::ClusterInfo& cluster, const VirtualCluster* vcluster,
Runtime::Loader& runtime, Runtime::RandomGenerator& random,
Event::Dispatcher& dispatcher, Upstream::ResourcePriority priority) PURE;
Http::ConnectionPool::Instance* getConnPool();
Http::ConnectionPool::Instance* getHttpConnPool();
void maybeDoShadowing();
bool maybeRetryReset(Http::StreamResetReason reset_reason, UpstreamRequest& upstream_request);
uint32_t numRequestsAwaitingHeaders();
Expand Down
99 changes: 63 additions & 36 deletions source/common/router/upstream_request.cc
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,9 @@
namespace Envoy {
namespace Router {

UpstreamRequest::UpstreamRequest(Filter& parent, Http::ConnectionPool::Instance& pool)
: parent_(parent), conn_pool_(pool), grpc_rq_success_deferred_(false),
stream_info_(pool.protocol(), parent_.callbacks_->dispatcher().timeSource()),
UpstreamRequest::UpstreamRequest(Filter& parent, std::unique_ptr<GenericConnPool>&& conn_pool)
: parent_(parent), conn_pool_(std::move(conn_pool)), grpc_rq_success_deferred_(false),
stream_info_(parent_.callbacks_->dispatcher().timeSource()),
start_time_(parent_.callbacks_->dispatcher().timeSource().monotonicTime()),
calling_encode_headers_(false), upstream_canary_(false), decode_complete_(false),
encode_complete_(false), encode_trailers_(false), retried_(false), awaiting_headers_(true),
Expand All @@ -61,6 +61,9 @@ UpstreamRequest::UpstreamRequest(Filter& parent, Http::ConnectionPool::Instance&
}

stream_info_.healthCheck(parent_.callbacks_->streamInfo().healthCheck());
if (conn_pool_->protocol().has_value()) {
stream_info_.protocol(conn_pool_->protocol().value());
}
}

UpstreamRequest::~UpstreamRequest() {
Expand Down Expand Up @@ -162,13 +165,7 @@ void UpstreamRequest::encodeHeaders(bool end_stream) {
ASSERT(!encode_complete_);
encode_complete_ = end_stream;

// It's possible for a reset to happen inline within the newStream() call. In this case, we
// might get deleted inline as well. Only write the returned handle out if it is not nullptr to
// deal with this case.
Http::ConnectionPool::Cancellable* handle = conn_pool_.newStream(*this, *this);
if (handle) {
conn_pool_stream_handle_ = handle;
}
conn_pool_->newStream(this);
}

void UpstreamRequest::encodeData(Buffer::Instance& data, bool end_stream) {
Expand Down Expand Up @@ -257,11 +254,9 @@ void UpstreamRequest::resetStream() {
span_->setTag(Tracing::Tags::get().Canceled, Tracing::Tags::get().True);
}

if (conn_pool_stream_handle_) {
ENVOY_STREAM_LOG(debug, "cancelling pool request", *parent_.callbacks_);
if (conn_pool_->cancelAnyPendingRequest()) {
ENVOY_STREAM_LOG(debug, "canceled pool request", *parent_.callbacks_);
ASSERT(!upstream_);
conn_pool_stream_handle_->cancel();
conn_pool_stream_handle_ = nullptr;
}

if (upstream_) {
Expand Down Expand Up @@ -313,12 +308,15 @@ void UpstreamRequest::onPoolFailure(Http::ConnectionPool::PoolFailureReason reas
onResetStream(reset_reason, transport_failure_reason);
}

void UpstreamRequest::onPoolReady(Http::RequestEncoder& request_encoder,
Upstream::HostDescriptionConstSharedPtr host,
const StreamInfo::StreamInfo& info) {
void UpstreamRequest::onPoolReady(
std::unique_ptr<GenericUpstream>&& upstream, Upstream::HostDescriptionConstSharedPtr host,
const Network::Address::InstanceConstSharedPtr& upstream_local_address,
const StreamInfo::StreamInfo& info) {
// This may be called under an existing ScopeTrackerScopeState but it will unwind correctly.
ScopeTrackerScopeState scope(&parent_.callbacks_->scope(), parent_.callbacks_->dispatcher());
ENVOY_STREAM_LOG(debug, "pool ready", *parent_.callbacks_);
upstream_ = std::move(upstream);

if (parent_.request_vcluster_) {
// The cluster increases its upstream_rq_total_ counter right before firing this onPoolReady
// callback. Hence, the upstream request increases the virtual cluster's upstream_rq_total_ stat
Expand All @@ -330,9 +328,8 @@ void UpstreamRequest::onPoolReady(Http::RequestEncoder& request_encoder,

onUpstreamHostSelected(host);

stream_info_.setUpstreamLocalAddress(request_encoder.getStream().connectionLocalAddress());
parent_.callbacks_->streamInfo().setUpstreamLocalAddress(
request_encoder.getStream().connectionLocalAddress());
stream_info_.setUpstreamLocalAddress(upstream_local_address);
parent_.callbacks_->streamInfo().setUpstreamLocalAddress(upstream_local_address);

stream_info_.setUpstreamSslConnection(info.downstreamSslConnection());
parent_.callbacks_->streamInfo().setUpstreamSslConnection(info.downstreamSslConnection());
Expand All @@ -343,8 +340,11 @@ void UpstreamRequest::onPoolReady(Http::RequestEncoder& request_encoder,
create_per_try_timeout_on_request_complete_ = true;
}

conn_pool_stream_handle_ = nullptr;
setRequestEncoder(request_encoder);
// Make sure the connection manager will inform the downstream watermark manager when the
// downstream buffers are overrun. This may result in immediate watermark callbacks referencing
// the encoder.
parent_.callbacks_->addDownstreamWatermarkCallbacks(downstream_watermark_manager_);

calling_encode_headers_ = true;
if (parent_.route_entry_->autoHostRewrite() && !host->hostname().empty()) {
parent_.downstream_headers_->setHost(host->hostname());
Expand All @@ -360,8 +360,7 @@ void UpstreamRequest::onPoolReady(Http::RequestEncoder& request_encoder,
// If end_stream is set in headers, and there are metadata to send, delays end_stream. The case
// only happens when decoding headers filters return ContinueAndEndStream.
const bool delay_headers_end_stream = end_stream && !downstream_metadata_map_vector_.empty();
request_encoder.encodeHeaders(*parent_.downstream_headers_,
end_stream && !delay_headers_end_stream);
upstream_->encodeHeaders(*parent_.downstream_headers_, end_stream && !delay_headers_end_stream);
calling_encode_headers_ = false;

// It is possible to get reset in the middle of an encodeHeaders() call. This happens for
Expand All @@ -376,21 +375,21 @@ void UpstreamRequest::onPoolReady(Http::RequestEncoder& request_encoder,
if (!downstream_metadata_map_vector_.empty()) {
ENVOY_STREAM_LOG(debug, "Send metadata onPoolReady. {}", *parent_.callbacks_,
downstream_metadata_map_vector_);
request_encoder.encodeMetadata(downstream_metadata_map_vector_);
upstream_->encodeMetadata(downstream_metadata_map_vector_);
downstream_metadata_map_vector_.clear();
if (delay_headers_end_stream) {
Buffer::OwnedImpl empty_data("");
request_encoder.encodeData(empty_data, true);
upstream_->encodeData(empty_data, true);
}
}

if (buffered_request_body_) {
stream_info_.addBytesSent(buffered_request_body_->length());
request_encoder.encodeData(*buffered_request_body_, encode_complete_ && !encode_trailers_);
upstream_->encodeData(*buffered_request_body_, encode_complete_ && !encode_trailers_);
}

if (encode_trailers_) {
request_encoder.encodeTrailers(*parent_.downstream_trailers_);
upstream_->encodeTrailers(*parent_.downstream_trailers_);
}

if (encode_complete_) {
Expand All @@ -399,14 +398,6 @@ void UpstreamRequest::onPoolReady(Http::RequestEncoder& request_encoder,
}
}

void UpstreamRequest::setRequestEncoder(Http::RequestEncoder& request_encoder) {
upstream_.reset(new HttpUpstream(*this, &request_encoder));
// Now that there is an encoder, have the connection manager inform the manager when the
// downstream buffers are overrun. This may result in immediate watermark callbacks referencing
// the encoder.
parent_.callbacks_->addDownstreamWatermarkCallbacks(downstream_watermark_manager_);
}

void UpstreamRequest::clearRequestEncoder() {
// Before clearing the encoder, unsubscribe from callbacks.
if (upstream_) {
Expand Down Expand Up @@ -472,5 +463,41 @@ void UpstreamRequest::enableDataFromDownstreamForFlowControl() {
parent_.callbacks_->onDecoderFilterBelowWriteBufferLowWatermark();
}

void HttpConnPool::newStream(UpstreamRequest* request) {
request_ = request;
// It's possible for a reset to happen inline within the newStream() call. In this case, we
// might get deleted inline as well. Only write the returned handle out if it is not nullptr to
// deal with this case.
Http::ConnectionPool::Cancellable* handle = conn_pool_.newStream(*request, *this);
if (handle) {
conn_pool_stream_handle_ = handle;
}
}

bool HttpConnPool::cancelAnyPendingRequest() {
if (conn_pool_stream_handle_) {
conn_pool_stream_handle_->cancel();
conn_pool_stream_handle_ = nullptr;
return true;
}
return false;
}
absl::optional<Http::Protocol> HttpConnPool::protocol() const { return conn_pool_.protocol(); }

void HttpConnPool::onPoolFailure(Http::ConnectionPool::PoolFailureReason reason,
absl::string_view transport_failure_reason,
Upstream::HostDescriptionConstSharedPtr host) {
request_->onPoolFailure(reason, transport_failure_reason, host);
}

void HttpConnPool::onPoolReady(Http::RequestEncoder& request_encoder,
Upstream::HostDescriptionConstSharedPtr host,
const StreamInfo::StreamInfo& info) {
conn_pool_stream_handle_ = nullptr;
auto upstream = std::make_unique<HttpUpstream>(*request_, &request_encoder);
request_->onPoolReady(std::move(upstream), host,
request_encoder.getStream().connectionLocalAddress(), info);
}

} // namespace Router
} // namespace Envoy
55 changes: 46 additions & 9 deletions source/common/router/upstream_request.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,29 @@ namespace Router {

class Filter;
class GenericUpstream;
class UpstreamRequest;

// An API for wrapping either an HTTP or a TCP connection pool.
class GenericConnPool : public Logger::Loggable<Logger::Id::router> {
public:
virtual ~GenericConnPool() = default;
// Called to create a new HTTP stream or TCP connection. The implementation
// is then responsible for calling either onPoolReady or onPoolFailure on the
// supplied UpstreamRequest.
virtual void newStream(UpstreamRequest* request) PURE;
// Called to cancel a call to newStream. Returns true if a newStream request
// was canceled, false otherwise.
virtual bool cancelAnyPendingRequest() PURE;
// Optionally returns the protocol for the connection pool.
virtual absl::optional<Http::Protocol> protocol() const PURE;
};

// The base request for Upstream.
class UpstreamRequest : public Logger::Loggable<Logger::Id::router>,
public Http::ResponseDecoder,
public Http::ConnectionPool::Callbacks,
public LinkedObject<UpstreamRequest> {
public:
UpstreamRequest(Filter& parent, Http::ConnectionPool::Instance& pool);
UpstreamRequest(Filter& parent, std::unique_ptr<GenericConnPool>&& conn_pool);
~UpstreamRequest() override;

void encodeHeaders(bool end_stream);
Expand Down Expand Up @@ -62,15 +77,14 @@ class UpstreamRequest : public Logger::Loggable<Logger::Id::router>,
void disableDataFromDownstreamForFlowControl();
void enableDataFromDownstreamForFlowControl();

// Http::ConnectionPool::Callbacks
void onPoolFailure(Http::ConnectionPool::PoolFailureReason reason,
absl::string_view transport_failure_reason,
Upstream::HostDescriptionConstSharedPtr host) override;
void onPoolReady(Http::RequestEncoder& request_encoder,
Upstream::HostDescriptionConstSharedPtr host);
void onPoolReady(std::unique_ptr<GenericUpstream>&& upstream,
Upstream::HostDescriptionConstSharedPtr host,
const StreamInfo::StreamInfo& info) override;
const Network::Address::InstanceConstSharedPtr& upstream_local_address,
const StreamInfo::StreamInfo& info);

void setRequestEncoder(Http::RequestEncoder& request_encoder);
void clearRequestEncoder();

struct DownstreamWatermarkManager : public Http::DownstreamWatermarkCallbacks {
Expand Down Expand Up @@ -106,10 +120,9 @@ class UpstreamRequest : public Logger::Loggable<Logger::Id::router>,

private:
Filter& parent_;
Http::ConnectionPool::Instance& conn_pool_;
std::unique_ptr<GenericConnPool> conn_pool_;
bool grpc_rq_success_deferred_;
Event::TimerPtr per_try_timeout_;
Http::ConnectionPool::Cancellable* conn_pool_stream_handle_{};
std::unique_ptr<GenericUpstream> upstream_;
absl::optional<Http::StreamResetReason> deferred_reset_reason_;
Buffer::WatermarkBufferPtr buffered_request_body_;
Expand Down Expand Up @@ -142,6 +155,30 @@ class UpstreamRequest : public Logger::Loggable<Logger::Id::router>,
bool record_timeout_budget_ : 1;
};

class HttpConnPool : public GenericConnPool, public Http::ConnectionPool::Callbacks {
public:
HttpConnPool(Http::ConnectionPool::Instance& conn_pool) : conn_pool_(conn_pool) {}

// GenericConnPool
void newStream(UpstreamRequest* request) override;
bool cancelAnyPendingRequest() override;
absl::optional<Http::Protocol> protocol() const override;

// Http::ConnectionPool::Callbacks
void onPoolFailure(Http::ConnectionPool::PoolFailureReason reason,
absl::string_view transport_failure_reason,
Upstream::HostDescriptionConstSharedPtr host) override;
void onPoolReady(Http::RequestEncoder& request_encoder,
Upstream::HostDescriptionConstSharedPtr host,
const StreamInfo::StreamInfo& info) override;

private:
// Points to the actual connection pool to create streams from.
Http::ConnectionPool::Instance& conn_pool_;
Http::ConnectionPool::Cancellable* conn_pool_stream_handle_{};
UpstreamRequest* request_{};
};

// A generic API which covers common functionality between HTTP and TCP upstreams.
class GenericUpstream {
public:
Expand Down

0 comments on commit 7fdf056

Please sign in to comment.