Skip to content

Commit

Permalink
Clusterinfo consistency (#4600)
Browse files Browse the repository at this point in the history
Signed-off-by: Xin Zhuang <stevenzzz@stevenzzz6.cam.corp.google.com>
  • Loading branch information
stevenzzzz authored and mattklein123 committed Oct 8, 2018
1 parent d003be8 commit c5be849
Show file tree
Hide file tree
Showing 12 changed files with 69 additions and 14 deletions.
9 changes: 9 additions & 0 deletions include/envoy/http/filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include "envoy/router/router.h"
#include "envoy/ssl/connection.h"
#include "envoy/tracing/http_tracer.h"
#include "envoy/upstream/upstream.h"

namespace Envoy {
namespace Http {
Expand Down Expand Up @@ -111,6 +112,14 @@ class StreamFilterCallbacks {
*/
virtual Router::RouteConstSharedPtr route() PURE;

/**
* Returns the clusterInfo for the cached route.
* This method is to avoid multiple look ups in the filter chain, it also provides a consistent
* view of clusterInfo after a route is picked/repicked.
* NOTE: Cached clusterInfo and route will be updated the same time.
*/
virtual Upstream::ClusterInfoConstSharedPtr clusterInfo() PURE;

/**
* Clears the route cache for the current request. This must be called when a filter has modified
* the headers in a way that would affect routing.
Expand Down
6 changes: 3 additions & 3 deletions source/common/http/async_client_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ const AsyncStreamImpl::NullPathMatchCriterion
AsyncStreamImpl::RouteEntryImpl::path_match_criterion_;
const std::list<LowerCaseString> AsyncStreamImpl::NullConfig::internal_only_headers_;

AsyncClientImpl::AsyncClientImpl(const Upstream::ClusterInfo& cluster, Stats::Store& stats_store,
Event::Dispatcher& dispatcher,
AsyncClientImpl::AsyncClientImpl(Upstream::ClusterInfoConstSharedPtr cluster,
Stats::Store& stats_store, Event::Dispatcher& dispatcher,
const LocalInfo::LocalInfo& local_info,
Upstream::ClusterManager& cm, Runtime::Loader& runtime,
Runtime::RandomGenerator& random,
Expand Down Expand Up @@ -80,7 +80,7 @@ AsyncStreamImpl::AsyncStreamImpl(AsyncClientImpl& parent, AsyncClient::StreamCal
: parent_(parent), stream_callbacks_(callbacks), stream_id_(parent.config_.random_.random()),
router_(parent.config_), stream_info_(Protocol::Http11, parent.dispatcher().timeSystem()),
tracing_config_(Tracing::EgressConfig::get()),
route_(std::make_shared<RouteImpl>(parent_.cluster_.name(), timeout)) {
route_(std::make_shared<RouteImpl>(parent_.cluster_->name(), timeout)) {
if (buffer_body_for_retry) {
buffered_body_.reset(new Buffer::OwnedImpl());
}
Expand Down
6 changes: 4 additions & 2 deletions source/common/http/async_client_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include "envoy/router/shadow_writer.h"
#include "envoy/ssl/connection.h"
#include "envoy/tracing/http_tracer.h"
#include "envoy/upstream/upstream.h"

#include "common/common/empty_string.h"
#include "common/common/linked_object.h"
Expand All @@ -35,7 +36,7 @@ class AsyncRequestImpl;

class AsyncClientImpl final : public AsyncClient {
public:
AsyncClientImpl(const Upstream::ClusterInfo& cluster, Stats::Store& stats_store,
AsyncClientImpl(Upstream::ClusterInfoConstSharedPtr cluster, Stats::Store& stats_store,
Event::Dispatcher& dispatcher, const LocalInfo::LocalInfo& local_info,
Upstream::ClusterManager& cm, Runtime::Loader& runtime,
Runtime::RandomGenerator& random, Router::ShadowWriterPtr&& shadow_writer);
Expand All @@ -52,7 +53,7 @@ class AsyncClientImpl final : public AsyncClient {
Event::Dispatcher& dispatcher() override { return dispatcher_; }

private:
const Upstream::ClusterInfo& cluster_;
Upstream::ClusterInfoConstSharedPtr cluster_;
Router::FilterConfig config_;
Event::Dispatcher& dispatcher_;
std::list<std::unique_ptr<AsyncStreamImpl>> active_streams_;
Expand Down Expand Up @@ -267,6 +268,7 @@ class AsyncStreamImpl : public AsyncClient::Stream,
Event::Dispatcher& dispatcher() override { return parent_.dispatcher_; }
void resetStream() override;
Router::RouteConstSharedPtr route() override { return route_; }
Upstream::ClusterInfoConstSharedPtr clusterInfo() override { return parent_.cluster_; }
void clearRouteCache() override {}
uint64_t streamId() override { return stream_id_; }
StreamInfo::StreamInfo& streamInfo() override { return stream_info_; }
Expand Down
19 changes: 18 additions & 1 deletion source/common/http/conn_manager_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -951,6 +951,13 @@ void ConnectionManagerImpl::ActiveStream::refreshCachedRoute() {
Router::RouteConstSharedPtr route = snapped_route_config_->route(*request_headers_, stream_id_);
stream_info_.route_entry_ = route ? route->routeEntry() : nullptr;
cached_route_ = std::move(route);
if (nullptr == stream_info_.route_entry_) {
cached_cluster_info_ = nullptr;
} else {
Upstream::ThreadLocalCluster* local_cluster =
connection_manager_.cluster_manager_.get(stream_info_.route_entry_->clusterName());
cached_cluster_info_ = (nullptr == local_cluster) ? nullptr : local_cluster->info();
}
}

void ConnectionManagerImpl::ActiveStream::sendLocalReply(
Expand Down Expand Up @@ -1477,8 +1484,17 @@ Tracing::Span& ConnectionManagerImpl::ActiveStreamFilterBase::activeSpan() {

Tracing::Config& ConnectionManagerImpl::ActiveStreamFilterBase::tracingConfig() { return parent_; }

Upstream::ClusterInfoConstSharedPtr ConnectionManagerImpl::ActiveStreamFilterBase::clusterInfo() {
// NOTE: Refreshing route caches clusterInfo as well.
if (!parent_.cached_route_.has_value()) {
parent_.refreshCachedRoute();
}

return parent_.cached_cluster_info_.value();
}

Router::RouteConstSharedPtr ConnectionManagerImpl::ActiveStreamFilterBase::route() {
if (!parent_.cached_route_) {
if (!parent_.cached_route_.has_value()) {
parent_.refreshCachedRoute();
}

Expand All @@ -1487,6 +1503,7 @@ Router::RouteConstSharedPtr ConnectionManagerImpl::ActiveStreamFilterBase::route

void ConnectionManagerImpl::ActiveStreamFilterBase::clearRouteCache() {
parent_.cached_route_ = absl::optional<Router::RouteConstSharedPtr>();
parent_.cached_cluster_info_ = absl::optional<Upstream::ClusterInfoConstSharedPtr>();
}

Buffer::WatermarkBufferPtr ConnectionManagerImpl::ActiveStreamDecoderFilter::createBuffer() {
Expand Down
2 changes: 2 additions & 0 deletions source/common/http/conn_manager_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ class ConnectionManagerImpl : Logger::Loggable<Logger::Id::http>,
Event::Dispatcher& dispatcher() override;
void resetStream() override;
Router::RouteConstSharedPtr route() override;
Upstream::ClusterInfoConstSharedPtr clusterInfo() override;
void clearRouteCache() override;
uint64_t streamId() override;
StreamInfo::StreamInfo& streamInfo() override;
Expand Down Expand Up @@ -403,6 +404,7 @@ class ConnectionManagerImpl : Logger::Loggable<Logger::Id::http>,
State state_;
StreamInfo::StreamInfoImpl stream_info_;
absl::optional<Router::RouteConstSharedPtr> cached_route_;
absl::optional<Upstream::ClusterInfoConstSharedPtr> cached_cluster_info_;
DownstreamWatermarkCallbacks* watermark_callbacks_{nullptr};
uint32_t buffer_limit_{0};
uint32_t high_watermark_count_{0};
Expand Down
2 changes: 1 addition & 1 deletion source/common/upstream/cluster_manager_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1017,7 +1017,7 @@ ClusterManagerImpl::ThreadLocalClusterManagerImpl::ClusterEntry::ClusterEntry(
ThreadLocalClusterManagerImpl& parent, ClusterInfoConstSharedPtr cluster,
const LoadBalancerFactorySharedPtr& lb_factory)
: parent_(parent), lb_factory_(lb_factory), cluster_info_(cluster),
http_async_client_(*cluster, parent.parent_.stats_, parent.thread_local_dispatcher_,
http_async_client_(cluster, parent.parent_.stats_, parent.thread_local_dispatcher_,
parent.parent_.local_info_, parent.parent_, parent.parent_.runtime_,
parent.parent_.random_,
Router::ShadowWriterPtr{new Router::ShadowWriterImpl(parent.parent_)}) {
Expand Down
2 changes: 1 addition & 1 deletion test/common/grpc/grpc_client_integration_test_harness.h
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ class GrpcClientIntegrationTest : public GrpcClientIntegrationParamTest {
EXPECT_CALL(cm_, httpConnPoolForCluster(_, _, _, _))
.WillRepeatedly(Return(http_conn_pool_.get()));
http_async_client_ = std::make_unique<Http::AsyncClientImpl>(
*cluster_info_ptr_, *stats_store_, dispatcher_, local_info_, cm_, runtime_, random_,
cluster_info_ptr_, *stats_store_, dispatcher_, local_info_, cm_, runtime_, random_,
std::move(shadow_writer_ptr_));
EXPECT_CALL(cm_, httpAsyncClientForCluster(fake_cluster_name_))
.WillRepeatedly(ReturnRef(*http_async_client_));
Expand Down
5 changes: 4 additions & 1 deletion test/common/http/async_client_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ namespace {
class AsyncClientImplTest : public testing::Test {
public:
AsyncClientImplTest()
: client_(*cm_.thread_local_cluster_.cluster_.info_, stats_store_, dispatcher_, local_info_,
: client_(cm_.thread_local_cluster_.cluster_.info_, stats_store_, dispatcher_, local_info_,
cm_, runtime_, random_,
Router::ShadowWriterPtr{new NiceMock<Router::MockShadowWriter>()}) {
message_->headers().insertMethod().value(std::string("GET"));
Expand Down Expand Up @@ -909,6 +909,9 @@ TEST_F(AsyncClientImplTest, RdsGettersTest) {
EXPECT_EQ("", route_config.name());
EXPECT_EQ(0, route_config.internalOnlyHeaders().size());
EXPECT_EQ(nullptr, route_config.route(headers, 0));
auto cluster_info = filter_callbacks->clusterInfo();
ASSERT_NE(nullptr, cluster_info);
EXPECT_EQ(cm_.thread_local_cluster_.cluster_.info_, cluster_info);
EXPECT_CALL(stream_callbacks_, onReset());
}

Expand Down
21 changes: 18 additions & 3 deletions test/common/http/conn_manager_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
#include "test/mocks/server/mocks.h"
#include "test/mocks/ssl/mocks.h"
#include "test/mocks/tracing/mocks.h"
#include "test/mocks/upstream/cluster_info.h"
#include "test/mocks/upstream/mocks.h"
#include "test/test_common/printers.h"
#include "test/test_common/test_time.h"
Expand Down Expand Up @@ -1507,7 +1508,7 @@ TEST_F(HttpConnectionManagerImplTest, AllowNonWebSocketOnWebSocketRoute) {
TEST_F(HttpConnectionManagerImplTest, WebSocketNoThreadLocalCluster) {
setup(false, "");

EXPECT_CALL(cluster_manager_, get(_)).WillOnce(Return(nullptr));
EXPECT_CALL(cluster_manager_, get(_)).Times(2).WillRepeatedly(Return(nullptr));
expectOnUpstreamInitFailure();
EXPECT_EQ(1U, stats_.named_.downstream_cx_websocket_active_.value());
EXPECT_EQ(1U, stats_.named_.downstream_cx_websocket_total_.value());
Expand Down Expand Up @@ -2639,9 +2640,19 @@ TEST_F(HttpConnectionManagerImplTest, FilterClearRouteCache) {
}));

setupFilterChain(3, 2);
const std::string fake_cluster1_name = "fake_cluster1";
const std::string fake_cluster2_name = "fake_cluster2";

Router::RouteConstSharedPtr route1 = std::make_shared<NiceMock<Router::MockRoute>>();
Router::RouteConstSharedPtr route2 = std::make_shared<NiceMock<Router::MockRoute>>();
std::shared_ptr<Upstream::MockThreadLocalCluster> fake_cluster1 =
std::make_shared<NiceMock<Upstream::MockThreadLocalCluster>>();
EXPECT_CALL(cluster_manager_, get(_))
.WillOnce(Return(fake_cluster1.get()))
.WillOnce(Return(nullptr));

std::shared_ptr<Router::MockRoute> route1 = std::make_shared<NiceMock<Router::MockRoute>>();
EXPECT_CALL(route1->route_entry_, clusterName()).WillRepeatedly(ReturnRef(fake_cluster1_name));
std::shared_ptr<Router::MockRoute> route2 = std::make_shared<NiceMock<Router::MockRoute>>();
EXPECT_CALL(route2->route_entry_, clusterName()).WillRepeatedly(ReturnRef(fake_cluster2_name));

EXPECT_CALL(*route_config_provider_.route_config_, route(_, _))
.WillOnce(Return(route1))
Expand All @@ -2652,18 +2663,22 @@ TEST_F(HttpConnectionManagerImplTest, FilterClearRouteCache) {
.WillOnce(InvokeWithoutArgs([&]() -> FilterHeadersStatus {
EXPECT_EQ(route1, decoder_filters_[0]->callbacks_->route());
EXPECT_EQ(route1->routeEntry(), decoder_filters_[0]->callbacks_->streamInfo().routeEntry());
EXPECT_EQ(fake_cluster1->info(), decoder_filters_[0]->callbacks_->clusterInfo());
decoder_filters_[0]->callbacks_->clearRouteCache();
return FilterHeadersStatus::Continue;
}));
EXPECT_CALL(*decoder_filters_[1], decodeHeaders(_, true))
.WillOnce(InvokeWithoutArgs([&]() -> FilterHeadersStatus {
EXPECT_EQ(route2, decoder_filters_[1]->callbacks_->route());
EXPECT_EQ(route2->routeEntry(), decoder_filters_[1]->callbacks_->streamInfo().routeEntry());
// RDS & CDS consistency problem: route2 points to fake_cluster2, which doesn't exist.
EXPECT_EQ(nullptr, decoder_filters_[1]->callbacks_->clusterInfo());
decoder_filters_[1]->callbacks_->clearRouteCache();
return FilterHeadersStatus::Continue;
}));
EXPECT_CALL(*decoder_filters_[2], decodeHeaders(_, true))
.WillOnce(InvokeWithoutArgs([&]() -> FilterHeadersStatus {
EXPECT_EQ(nullptr, decoder_filters_[2]->callbacks_->clusterInfo());
EXPECT_EQ(nullptr, decoder_filters_[2]->callbacks_->route());
EXPECT_EQ(nullptr, decoder_filters_[2]->callbacks_->streamInfo().routeEntry());
return FilterHeadersStatus::StopIteration;
Expand Down
5 changes: 3 additions & 2 deletions test/common/upstream/cluster_manager_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -757,8 +757,9 @@ TEST_F(ClusterManagerImplTest, ShutdownOrder) {
EXPECT_EQ(cluster.prioritySet().hostSetsPerPriority()[0]->hosts()[0],
cluster_manager_->get("cluster_1")->loadBalancer().chooseHost(nullptr));

// Local reference, primary reference, thread local reference, host reference.
EXPECT_EQ(4U, cluster.info().use_count());
// Local reference, primary reference, thread local reference, host reference, async client
// reference.
EXPECT_EQ(5U, cluster.info().use_count());

// Thread local reference should be gone.
factory_.tls_.shutdownThread();
Expand Down
2 changes: 2 additions & 0 deletions test/mocks/http/mocks.cc
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,11 @@ MockFilterChainFactory::MockFilterChainFactory() {}
MockFilterChainFactory::~MockFilterChainFactory() {}

template <class T> static void initializeMockStreamFilterCallbacks(T& callbacks) {
callbacks.cluster_info_.reset(new NiceMock<Upstream::MockClusterInfo>());
callbacks.route_.reset(new NiceMock<Router::MockRoute>());
ON_CALL(callbacks, dispatcher()).WillByDefault(ReturnRef(callbacks.dispatcher_));
ON_CALL(callbacks, streamInfo()).WillByDefault(ReturnRef(callbacks.stream_info_));
ON_CALL(callbacks, clusterInfo()).WillByDefault(Return(callbacks.cluster_info_));
ON_CALL(callbacks, route()).WillByDefault(Return(callbacks.route_));
}

Expand Down
4 changes: 4 additions & 0 deletions test/mocks/http/mocks.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include "test/mocks/router/mocks.h"
#include "test/mocks/stream_info/mocks.h"
#include "test/mocks/tracing/mocks.h"
#include "test/mocks/upstream/cluster_info.h"
#include "test/mocks/upstream/host.h"
#include "test/test_common/printers.h"

Expand Down Expand Up @@ -177,6 +178,7 @@ class MockStreamFilterCallbacksBase {
Event::MockDispatcher dispatcher_;
testing::NiceMock<StreamInfo::MockStreamInfo> stream_info_;
std::shared_ptr<Router::MockRoute> route_;
std::shared_ptr<Upstream::MockClusterInfo> cluster_info_;
};

class MockStreamDecoderFilterCallbacks : public StreamDecoderFilterCallbacks,
Expand All @@ -189,6 +191,7 @@ class MockStreamDecoderFilterCallbacks : public StreamDecoderFilterCallbacks,
MOCK_METHOD0(connection, const Network::Connection*());
MOCK_METHOD0(dispatcher, Event::Dispatcher&());
MOCK_METHOD0(resetStream, void());
MOCK_METHOD0(clusterInfo, Upstream::ClusterInfoConstSharedPtr());
MOCK_METHOD0(route, Router::RouteConstSharedPtr());
MOCK_METHOD0(clearRouteCache, void());
MOCK_METHOD0(streamId, uint64_t());
Expand Down Expand Up @@ -252,6 +255,7 @@ class MockStreamEncoderFilterCallbacks : public StreamEncoderFilterCallbacks,
MOCK_METHOD0(connection, const Network::Connection*());
MOCK_METHOD0(dispatcher, Event::Dispatcher&());
MOCK_METHOD0(resetStream, void());
MOCK_METHOD0(clusterInfo, Upstream::ClusterInfoConstSharedPtr());
MOCK_METHOD0(route, Router::RouteConstSharedPtr());
MOCK_METHOD0(clearRouteCache, void());
MOCK_METHOD0(streamId, uint64_t());
Expand Down

0 comments on commit c5be849

Please sign in to comment.