From c5be849b5d9c35b6c416eaeb6cdc1d123e1ed75c Mon Sep 17 00:00:00 2001 From: Xin Date: Mon, 8 Oct 2018 14:44:57 -0400 Subject: [PATCH] Clusterinfo consistency (#4600) Signed-off-by: Xin Zhuang --- include/envoy/http/filter.h | 9 ++++++++ source/common/http/async_client_impl.cc | 6 +++--- source/common/http/async_client_impl.h | 6 ++++-- source/common/http/conn_manager_impl.cc | 19 ++++++++++++++++- source/common/http/conn_manager_impl.h | 2 ++ .../common/upstream/cluster_manager_impl.cc | 2 +- .../grpc_client_integration_test_harness.h | 2 +- test/common/http/async_client_impl_test.cc | 5 ++++- test/common/http/conn_manager_impl_test.cc | 21 ++++++++++++++++--- .../upstream/cluster_manager_impl_test.cc | 5 +++-- test/mocks/http/mocks.cc | 2 ++ test/mocks/http/mocks.h | 4 ++++ 12 files changed, 69 insertions(+), 14 deletions(-) diff --git a/include/envoy/http/filter.h b/include/envoy/http/filter.h index 6c95d9104eb1..519065fb117d 100644 --- a/include/envoy/http/filter.h +++ b/include/envoy/http/filter.h @@ -12,6 +12,7 @@ #include "envoy/router/router.h" #include "envoy/ssl/connection.h" #include "envoy/tracing/http_tracer.h" +#include "envoy/upstream/upstream.h" namespace Envoy { namespace Http { @@ -111,6 +112,14 @@ class StreamFilterCallbacks { */ virtual Router::RouteConstSharedPtr route() PURE; + /** + * Returns the clusterInfo for the cached route. + * This method is to avoid multiple look ups in the filter chain, it also provides a consistent + * view of clusterInfo after a route is picked/repicked. + * NOTE: Cached clusterInfo and route will be updated the same time. + */ + virtual Upstream::ClusterInfoConstSharedPtr clusterInfo() PURE; + /** * Clears the route cache for the current request. This must be called when a filter has modified * the headers in a way that would affect routing. diff --git a/source/common/http/async_client_impl.cc b/source/common/http/async_client_impl.cc index 9d8e76004b0f..5fd5e7953b1c 100644 --- a/source/common/http/async_client_impl.cc +++ b/source/common/http/async_client_impl.cc @@ -29,8 +29,8 @@ const AsyncStreamImpl::NullPathMatchCriterion AsyncStreamImpl::RouteEntryImpl::path_match_criterion_; const std::list AsyncStreamImpl::NullConfig::internal_only_headers_; -AsyncClientImpl::AsyncClientImpl(const Upstream::ClusterInfo& cluster, Stats::Store& stats_store, - Event::Dispatcher& dispatcher, +AsyncClientImpl::AsyncClientImpl(Upstream::ClusterInfoConstSharedPtr cluster, + Stats::Store& stats_store, Event::Dispatcher& dispatcher, const LocalInfo::LocalInfo& local_info, Upstream::ClusterManager& cm, Runtime::Loader& runtime, Runtime::RandomGenerator& random, @@ -80,7 +80,7 @@ AsyncStreamImpl::AsyncStreamImpl(AsyncClientImpl& parent, AsyncClient::StreamCal : parent_(parent), stream_callbacks_(callbacks), stream_id_(parent.config_.random_.random()), router_(parent.config_), stream_info_(Protocol::Http11, parent.dispatcher().timeSystem()), tracing_config_(Tracing::EgressConfig::get()), - route_(std::make_shared(parent_.cluster_.name(), timeout)) { + route_(std::make_shared(parent_.cluster_->name(), timeout)) { if (buffer_body_for_retry) { buffered_body_.reset(new Buffer::OwnedImpl()); } diff --git a/source/common/http/async_client_impl.h b/source/common/http/async_client_impl.h index b3bd09da3c6e..be2ca36f5634 100644 --- a/source/common/http/async_client_impl.h +++ b/source/common/http/async_client_impl.h @@ -19,6 +19,7 @@ #include "envoy/router/shadow_writer.h" #include "envoy/ssl/connection.h" #include "envoy/tracing/http_tracer.h" +#include "envoy/upstream/upstream.h" #include "common/common/empty_string.h" #include "common/common/linked_object.h" @@ -35,7 +36,7 @@ class AsyncRequestImpl; class AsyncClientImpl final : public AsyncClient { public: - AsyncClientImpl(const Upstream::ClusterInfo& cluster, Stats::Store& stats_store, + AsyncClientImpl(Upstream::ClusterInfoConstSharedPtr cluster, Stats::Store& stats_store, Event::Dispatcher& dispatcher, const LocalInfo::LocalInfo& local_info, Upstream::ClusterManager& cm, Runtime::Loader& runtime, Runtime::RandomGenerator& random, Router::ShadowWriterPtr&& shadow_writer); @@ -52,7 +53,7 @@ class AsyncClientImpl final : public AsyncClient { Event::Dispatcher& dispatcher() override { return dispatcher_; } private: - const Upstream::ClusterInfo& cluster_; + Upstream::ClusterInfoConstSharedPtr cluster_; Router::FilterConfig config_; Event::Dispatcher& dispatcher_; std::list> active_streams_; @@ -267,6 +268,7 @@ class AsyncStreamImpl : public AsyncClient::Stream, Event::Dispatcher& dispatcher() override { return parent_.dispatcher_; } void resetStream() override; Router::RouteConstSharedPtr route() override { return route_; } + Upstream::ClusterInfoConstSharedPtr clusterInfo() override { return parent_.cluster_; } void clearRouteCache() override {} uint64_t streamId() override { return stream_id_; } StreamInfo::StreamInfo& streamInfo() override { return stream_info_; } diff --git a/source/common/http/conn_manager_impl.cc b/source/common/http/conn_manager_impl.cc index 237a1b6ae8e7..2c702ab71349 100644 --- a/source/common/http/conn_manager_impl.cc +++ b/source/common/http/conn_manager_impl.cc @@ -951,6 +951,13 @@ void ConnectionManagerImpl::ActiveStream::refreshCachedRoute() { Router::RouteConstSharedPtr route = snapped_route_config_->route(*request_headers_, stream_id_); stream_info_.route_entry_ = route ? route->routeEntry() : nullptr; cached_route_ = std::move(route); + if (nullptr == stream_info_.route_entry_) { + cached_cluster_info_ = nullptr; + } else { + Upstream::ThreadLocalCluster* local_cluster = + connection_manager_.cluster_manager_.get(stream_info_.route_entry_->clusterName()); + cached_cluster_info_ = (nullptr == local_cluster) ? nullptr : local_cluster->info(); + } } void ConnectionManagerImpl::ActiveStream::sendLocalReply( @@ -1477,8 +1484,17 @@ Tracing::Span& ConnectionManagerImpl::ActiveStreamFilterBase::activeSpan() { Tracing::Config& ConnectionManagerImpl::ActiveStreamFilterBase::tracingConfig() { return parent_; } +Upstream::ClusterInfoConstSharedPtr ConnectionManagerImpl::ActiveStreamFilterBase::clusterInfo() { + // NOTE: Refreshing route caches clusterInfo as well. + if (!parent_.cached_route_.has_value()) { + parent_.refreshCachedRoute(); + } + + return parent_.cached_cluster_info_.value(); +} + Router::RouteConstSharedPtr ConnectionManagerImpl::ActiveStreamFilterBase::route() { - if (!parent_.cached_route_) { + if (!parent_.cached_route_.has_value()) { parent_.refreshCachedRoute(); } @@ -1487,6 +1503,7 @@ Router::RouteConstSharedPtr ConnectionManagerImpl::ActiveStreamFilterBase::route void ConnectionManagerImpl::ActiveStreamFilterBase::clearRouteCache() { parent_.cached_route_ = absl::optional(); + parent_.cached_cluster_info_ = absl::optional(); } Buffer::WatermarkBufferPtr ConnectionManagerImpl::ActiveStreamDecoderFilter::createBuffer() { diff --git a/source/common/http/conn_manager_impl.h b/source/common/http/conn_manager_impl.h index 89747dca1efb..6106a119ac33 100644 --- a/source/common/http/conn_manager_impl.h +++ b/source/common/http/conn_manager_impl.h @@ -120,6 +120,7 @@ class ConnectionManagerImpl : Logger::Loggable, Event::Dispatcher& dispatcher() override; void resetStream() override; Router::RouteConstSharedPtr route() override; + Upstream::ClusterInfoConstSharedPtr clusterInfo() override; void clearRouteCache() override; uint64_t streamId() override; StreamInfo::StreamInfo& streamInfo() override; @@ -403,6 +404,7 @@ class ConnectionManagerImpl : Logger::Loggable, State state_; StreamInfo::StreamInfoImpl stream_info_; absl::optional cached_route_; + absl::optional cached_cluster_info_; DownstreamWatermarkCallbacks* watermark_callbacks_{nullptr}; uint32_t buffer_limit_{0}; uint32_t high_watermark_count_{0}; diff --git a/source/common/upstream/cluster_manager_impl.cc b/source/common/upstream/cluster_manager_impl.cc index 664447b771bc..dd7f461c9131 100644 --- a/source/common/upstream/cluster_manager_impl.cc +++ b/source/common/upstream/cluster_manager_impl.cc @@ -1017,7 +1017,7 @@ ClusterManagerImpl::ThreadLocalClusterManagerImpl::ClusterEntry::ClusterEntry( ThreadLocalClusterManagerImpl& parent, ClusterInfoConstSharedPtr cluster, const LoadBalancerFactorySharedPtr& lb_factory) : parent_(parent), lb_factory_(lb_factory), cluster_info_(cluster), - http_async_client_(*cluster, parent.parent_.stats_, parent.thread_local_dispatcher_, + http_async_client_(cluster, parent.parent_.stats_, parent.thread_local_dispatcher_, parent.parent_.local_info_, parent.parent_, parent.parent_.runtime_, parent.parent_.random_, Router::ShadowWriterPtr{new Router::ShadowWriterImpl(parent.parent_)}) { diff --git a/test/common/grpc/grpc_client_integration_test_harness.h b/test/common/grpc/grpc_client_integration_test_harness.h index 1405c4ab4c1c..16b786d28ba4 100644 --- a/test/common/grpc/grpc_client_integration_test_harness.h +++ b/test/common/grpc/grpc_client_integration_test_harness.h @@ -268,7 +268,7 @@ class GrpcClientIntegrationTest : public GrpcClientIntegrationParamTest { EXPECT_CALL(cm_, httpConnPoolForCluster(_, _, _, _)) .WillRepeatedly(Return(http_conn_pool_.get())); http_async_client_ = std::make_unique( - *cluster_info_ptr_, *stats_store_, dispatcher_, local_info_, cm_, runtime_, random_, + cluster_info_ptr_, *stats_store_, dispatcher_, local_info_, cm_, runtime_, random_, std::move(shadow_writer_ptr_)); EXPECT_CALL(cm_, httpAsyncClientForCluster(fake_cluster_name_)) .WillRepeatedly(ReturnRef(*http_async_client_)); diff --git a/test/common/http/async_client_impl_test.cc b/test/common/http/async_client_impl_test.cc index c81ac4dffcfa..7503acc4c43a 100644 --- a/test/common/http/async_client_impl_test.cc +++ b/test/common/http/async_client_impl_test.cc @@ -35,7 +35,7 @@ namespace { class AsyncClientImplTest : public testing::Test { public: AsyncClientImplTest() - : client_(*cm_.thread_local_cluster_.cluster_.info_, stats_store_, dispatcher_, local_info_, + : client_(cm_.thread_local_cluster_.cluster_.info_, stats_store_, dispatcher_, local_info_, cm_, runtime_, random_, Router::ShadowWriterPtr{new NiceMock()}) { message_->headers().insertMethod().value(std::string("GET")); @@ -909,6 +909,9 @@ TEST_F(AsyncClientImplTest, RdsGettersTest) { EXPECT_EQ("", route_config.name()); EXPECT_EQ(0, route_config.internalOnlyHeaders().size()); EXPECT_EQ(nullptr, route_config.route(headers, 0)); + auto cluster_info = filter_callbacks->clusterInfo(); + ASSERT_NE(nullptr, cluster_info); + EXPECT_EQ(cm_.thread_local_cluster_.cluster_.info_, cluster_info); EXPECT_CALL(stream_callbacks_, onReset()); } diff --git a/test/common/http/conn_manager_impl_test.cc b/test/common/http/conn_manager_impl_test.cc index 07a459228188..acef820b88ef 100644 --- a/test/common/http/conn_manager_impl_test.cc +++ b/test/common/http/conn_manager_impl_test.cc @@ -36,6 +36,7 @@ #include "test/mocks/server/mocks.h" #include "test/mocks/ssl/mocks.h" #include "test/mocks/tracing/mocks.h" +#include "test/mocks/upstream/cluster_info.h" #include "test/mocks/upstream/mocks.h" #include "test/test_common/printers.h" #include "test/test_common/test_time.h" @@ -1507,7 +1508,7 @@ TEST_F(HttpConnectionManagerImplTest, AllowNonWebSocketOnWebSocketRoute) { TEST_F(HttpConnectionManagerImplTest, WebSocketNoThreadLocalCluster) { setup(false, ""); - EXPECT_CALL(cluster_manager_, get(_)).WillOnce(Return(nullptr)); + EXPECT_CALL(cluster_manager_, get(_)).Times(2).WillRepeatedly(Return(nullptr)); expectOnUpstreamInitFailure(); EXPECT_EQ(1U, stats_.named_.downstream_cx_websocket_active_.value()); EXPECT_EQ(1U, stats_.named_.downstream_cx_websocket_total_.value()); @@ -2639,9 +2640,19 @@ TEST_F(HttpConnectionManagerImplTest, FilterClearRouteCache) { })); setupFilterChain(3, 2); + const std::string fake_cluster1_name = "fake_cluster1"; + const std::string fake_cluster2_name = "fake_cluster2"; - Router::RouteConstSharedPtr route1 = std::make_shared>(); - Router::RouteConstSharedPtr route2 = std::make_shared>(); + std::shared_ptr fake_cluster1 = + std::make_shared>(); + EXPECT_CALL(cluster_manager_, get(_)) + .WillOnce(Return(fake_cluster1.get())) + .WillOnce(Return(nullptr)); + + std::shared_ptr route1 = std::make_shared>(); + EXPECT_CALL(route1->route_entry_, clusterName()).WillRepeatedly(ReturnRef(fake_cluster1_name)); + std::shared_ptr route2 = std::make_shared>(); + EXPECT_CALL(route2->route_entry_, clusterName()).WillRepeatedly(ReturnRef(fake_cluster2_name)); EXPECT_CALL(*route_config_provider_.route_config_, route(_, _)) .WillOnce(Return(route1)) @@ -2652,6 +2663,7 @@ TEST_F(HttpConnectionManagerImplTest, FilterClearRouteCache) { .WillOnce(InvokeWithoutArgs([&]() -> FilterHeadersStatus { EXPECT_EQ(route1, decoder_filters_[0]->callbacks_->route()); EXPECT_EQ(route1->routeEntry(), decoder_filters_[0]->callbacks_->streamInfo().routeEntry()); + EXPECT_EQ(fake_cluster1->info(), decoder_filters_[0]->callbacks_->clusterInfo()); decoder_filters_[0]->callbacks_->clearRouteCache(); return FilterHeadersStatus::Continue; })); @@ -2659,11 +2671,14 @@ TEST_F(HttpConnectionManagerImplTest, FilterClearRouteCache) { .WillOnce(InvokeWithoutArgs([&]() -> FilterHeadersStatus { EXPECT_EQ(route2, decoder_filters_[1]->callbacks_->route()); EXPECT_EQ(route2->routeEntry(), decoder_filters_[1]->callbacks_->streamInfo().routeEntry()); + // RDS & CDS consistency problem: route2 points to fake_cluster2, which doesn't exist. + EXPECT_EQ(nullptr, decoder_filters_[1]->callbacks_->clusterInfo()); decoder_filters_[1]->callbacks_->clearRouteCache(); return FilterHeadersStatus::Continue; })); EXPECT_CALL(*decoder_filters_[2], decodeHeaders(_, true)) .WillOnce(InvokeWithoutArgs([&]() -> FilterHeadersStatus { + EXPECT_EQ(nullptr, decoder_filters_[2]->callbacks_->clusterInfo()); EXPECT_EQ(nullptr, decoder_filters_[2]->callbacks_->route()); EXPECT_EQ(nullptr, decoder_filters_[2]->callbacks_->streamInfo().routeEntry()); return FilterHeadersStatus::StopIteration; diff --git a/test/common/upstream/cluster_manager_impl_test.cc b/test/common/upstream/cluster_manager_impl_test.cc index 377d4018f450..12ae2f29e615 100644 --- a/test/common/upstream/cluster_manager_impl_test.cc +++ b/test/common/upstream/cluster_manager_impl_test.cc @@ -757,8 +757,9 @@ TEST_F(ClusterManagerImplTest, ShutdownOrder) { EXPECT_EQ(cluster.prioritySet().hostSetsPerPriority()[0]->hosts()[0], cluster_manager_->get("cluster_1")->loadBalancer().chooseHost(nullptr)); - // Local reference, primary reference, thread local reference, host reference. - EXPECT_EQ(4U, cluster.info().use_count()); + // Local reference, primary reference, thread local reference, host reference, async client + // reference. + EXPECT_EQ(5U, cluster.info().use_count()); // Thread local reference should be gone. factory_.tls_.shutdownThread(); diff --git a/test/mocks/http/mocks.cc b/test/mocks/http/mocks.cc index c9a8352c441d..47d115a07dd5 100644 --- a/test/mocks/http/mocks.cc +++ b/test/mocks/http/mocks.cc @@ -68,9 +68,11 @@ MockFilterChainFactory::MockFilterChainFactory() {} MockFilterChainFactory::~MockFilterChainFactory() {} template static void initializeMockStreamFilterCallbacks(T& callbacks) { + callbacks.cluster_info_.reset(new NiceMock()); callbacks.route_.reset(new NiceMock()); ON_CALL(callbacks, dispatcher()).WillByDefault(ReturnRef(callbacks.dispatcher_)); ON_CALL(callbacks, streamInfo()).WillByDefault(ReturnRef(callbacks.stream_info_)); + ON_CALL(callbacks, clusterInfo()).WillByDefault(Return(callbacks.cluster_info_)); ON_CALL(callbacks, route()).WillByDefault(Return(callbacks.route_)); } diff --git a/test/mocks/http/mocks.h b/test/mocks/http/mocks.h index af401faa66f8..ce95ac6c85d5 100644 --- a/test/mocks/http/mocks.h +++ b/test/mocks/http/mocks.h @@ -22,6 +22,7 @@ #include "test/mocks/router/mocks.h" #include "test/mocks/stream_info/mocks.h" #include "test/mocks/tracing/mocks.h" +#include "test/mocks/upstream/cluster_info.h" #include "test/mocks/upstream/host.h" #include "test/test_common/printers.h" @@ -177,6 +178,7 @@ class MockStreamFilterCallbacksBase { Event::MockDispatcher dispatcher_; testing::NiceMock stream_info_; std::shared_ptr route_; + std::shared_ptr cluster_info_; }; class MockStreamDecoderFilterCallbacks : public StreamDecoderFilterCallbacks, @@ -189,6 +191,7 @@ class MockStreamDecoderFilterCallbacks : public StreamDecoderFilterCallbacks, MOCK_METHOD0(connection, const Network::Connection*()); MOCK_METHOD0(dispatcher, Event::Dispatcher&()); MOCK_METHOD0(resetStream, void()); + MOCK_METHOD0(clusterInfo, Upstream::ClusterInfoConstSharedPtr()); MOCK_METHOD0(route, Router::RouteConstSharedPtr()); MOCK_METHOD0(clearRouteCache, void()); MOCK_METHOD0(streamId, uint64_t()); @@ -252,6 +255,7 @@ class MockStreamEncoderFilterCallbacks : public StreamEncoderFilterCallbacks, MOCK_METHOD0(connection, const Network::Connection*()); MOCK_METHOD0(dispatcher, Event::Dispatcher&()); MOCK_METHOD0(resetStream, void()); + MOCK_METHOD0(clusterInfo, Upstream::ClusterInfoConstSharedPtr()); MOCK_METHOD0(route, Router::RouteConstSharedPtr()); MOCK_METHOD0(clearRouteCache, void()); MOCK_METHOD0(streamId, uint64_t());