diff --git a/changelogs/current.yaml b/changelogs/current.yaml index 7d879edc9806..192bb1b36a7f 100644 --- a/changelogs/current.yaml +++ b/changelogs/current.yaml @@ -184,5 +184,9 @@ new_features: change: | Made the :ref:`credential injector filter ` work as an upstream filter. - +- area: tcp_proxy + change: | + added :ref:`an option ` to allow filters to read from the + downstream connection before TCP proxy has opened the upstream connection, by setting a filter state object for the key + ``envoy.tcp_proxy.receive_before_connect``. deprecated: diff --git a/docs/root/configuration/listeners/network_filters/tcp_proxy_filter.rst b/docs/root/configuration/listeners/network_filters/tcp_proxy_filter.rst index 7917eef4b358..891065a97fc5 100644 --- a/docs/root/configuration/listeners/network_filters/tcp_proxy_filter.rst +++ b/docs/root/configuration/listeners/network_filters/tcp_proxy_filter.rst @@ -37,6 +37,19 @@ To define metadata that a suitable upstream host must match, use one of the foll In addition, dynamic metadata can be set by earlier network filters on the ``StreamInfo``. Setting the dynamic metadata must happen before ``onNewConnection()`` is called on the ``TcpProxy`` filter to affect load balancing. +.. _config_network_filters_tcp_proxy_receive_before_connect: + +Early reception and delayed upstream connection establishment +------------------------------------------------------------- + +``TcpProxy`` filter normally disables reading on the downstream connection until the upstream connection has been established. In some situations earlier filters in the filter chain (example as in https://github.com/envoyproxy/envoy/issues/9023) may need to read data from the downstream connection before allowing the upstream connection to be established. +This can be done by setting the ``StreamInfo`` filter state object for the key ``envoy.tcp_proxy.receive_before_connect`` to be `true`. Setting this filter state must happen in ``initializeReadFilterCallbacks()`` callback of the network filter so that it is done before ``TcpProxy`` filter is initialized. + +When the ``envoy.tcp_proxy.receive_before_connect`` filter state is set, it is possible that the ``TcpProxy`` filter receives data before the upstream connection has been established. +In such a case, ``TcpProxy`` filter now buffers data it receives before the upstream connection has been established and flushes it once the upstream connection is established. +Filters can also delay the upstream connection setup by returning ``StopIteration`` from their ``onNewConnection`` and ``onData`` callbacks. +On receiving early data, TCP_PROXY will read disable the connection until the upstream connection is established. This is to protect the early buffer from overflowing. + .. _config_network_filters_tcp_proxy_tunneling_over_http: Tunneling TCP over HTTP @@ -72,6 +85,7 @@ The downstream statistics are rooted at *tcp..* with the following downstream_cx_rx_bytes_buffered, Gauge, Total bytes currently buffered from the downstream connection downstream_flow_control_paused_reading_total, Counter, Total number of times flow control paused reading from downstream downstream_flow_control_resumed_reading_total, Counter, Total number of times flow control resumed reading from downstream + early_data_received_count_total, Counter, Total number of connections where tcp proxy received data before upstream connection establishment is complete idle_timeout, Counter, Total number of connections closed due to idle timeout max_downstream_connection_duration, Counter, Total number of connections closed due to max_downstream_connection_duration timeout on_demand_cluster_attempt, Counter, Total number of connections that requested on demand cluster diff --git a/source/common/tcp_proxy/BUILD b/source/common/tcp_proxy/BUILD index c1d1551424d8..27034f0f8d88 100644 --- a/source/common/tcp_proxy/BUILD +++ b/source/common/tcp_proxy/BUILD @@ -58,6 +58,7 @@ envoy_cc_library( "//envoy/stats:stats_interface", "//envoy/stats:stats_macros", "//envoy/stats:timespan_interface", + "//envoy/stream_info:bool_accessor_interface", "//envoy/stream_info:filter_state_interface", "//envoy/tcp:conn_pool_interface", "//envoy/tcp:upstream_interface", diff --git a/source/common/tcp_proxy/tcp_proxy.cc b/source/common/tcp_proxy/tcp_proxy.cc index 4ac1fec7ae8d..0c427d85d467 100644 --- a/source/common/tcp_proxy/tcp_proxy.cc +++ b/source/common/tcp_proxy/tcp_proxy.cc @@ -12,6 +12,7 @@ #include "envoy/extensions/filters/network/tcp_proxy/v3/tcp_proxy.pb.validate.h" #include "envoy/registry/registry.h" #include "envoy/stats/scope.h" +#include "envoy/stream_info/bool_accessor.h" #include "envoy/upstream/cluster_manager.h" #include "envoy/upstream/upstream.h" @@ -300,10 +301,25 @@ void Filter::initialize(Network::ReadFilterCallbacks& callbacks, bool set_connec ASSERT(getStreamInfo().getDownstreamBytesMeter() == nullptr); ASSERT(getStreamInfo().getUpstreamBytesMeter() != nullptr); - // Need to disable reads so that we don't write to an upstream that might fail - // in onData(). This will get re-enabled when the upstream connection is - // established. - read_callbacks_->connection().readDisable(true); + const StreamInfo::BoolAccessor* receive_before_connect = + read_callbacks_->connection() + .streamInfo() + .filterState() + ->getDataReadOnly(ReceiveBeforeConnectKey); + + // If receive_before_connect is set, we will not read disable the downstream connection + // as a filter before TCP_PROXY has set this state so that it can process data before + // the upstream connection is established. + if (receive_before_connect != nullptr && receive_before_connect->value()) { + ENVOY_CONN_LOG(debug, "receive_before_connect is enabled", read_callbacks_->connection()); + receive_before_connect_ = true; + } else { + // Need to disable reads so that we don't write to an upstream that might fail + // in onData(). This will get re-enabled when the upstream connection is + // established. + read_callbacks_->connection().readDisable(true); + } + getStreamInfo().setDownstreamBytesMeter(std::make_shared()); getStreamInfo().setUpstreamInfo(std::make_shared()); @@ -545,8 +561,12 @@ Network::FilterStatus Filter::establishUpstreamConnection() { // cluster->trafficStats()->upstream_cx_none_healthy in the latter case. getStreamInfo().setResponseFlag(StreamInfo::CoreResponseFlag::NoHealthyUpstream); onInitFailure(UpstreamFailureReason::NoHealthyUpstream); + return Network::FilterStatus::StopIteration; } - return Network::FilterStatus::StopIteration; + // If receive before connect is set, allow the FilterChain iteration to + // continue so that the other filters in the filter chain can process the data. + return receive_before_connect_ ? Network::FilterStatus::Continue + : Network::FilterStatus::StopIteration; } void Filter::onClusterDiscoveryCompletion(Upstream::ClusterDiscoveryStatus cluster_status) { @@ -795,12 +815,32 @@ Network::FilterStatus Filter::onData(Buffer::Instance& data, bool end_stream) { if (upstream_) { getStreamInfo().getUpstreamBytesMeter()->addWireBytesSent(data.length()); upstream_->encodeData(data, end_stream); + resetIdleTimer(); // TODO(ggreenway) PERF: do we need to reset timer on both send and receive? + } else if (receive_before_connect_) { + ENVOY_CONN_LOG(trace, "Early data received. Length: {}", read_callbacks_->connection(), + data.length()); + + // Buffer data received before upstream connection exists. + early_data_buffer_.move(data); + + // TCP_PROXY cannot correctly make a decision on the amount of data + // the preceding filters need to read before the upstream connection is established. + // Hence, to protect the early data buffer, TCP_PROXY read disables the downstream on + // receiving the first chunk of data. The filter setting the receive_before_connect state should + // have a limit on the amount of data it needs to read before the upstream connection is + // established and pause the filter chain (by returning `StopIteration`) till it has read the + // data it needs or a max limit has been reached. + read_callbacks_->connection().readDisable(true); + + config_->stats().early_data_received_count_total_.inc(); + if (!early_data_end_stream_) { + early_data_end_stream_ = end_stream; + } } // The upstream should consume all of the data. // Before there is an upstream the connection should be readDisabled. If the upstream is // destroyed, there should be no further reads as well. ASSERT(0 == data.length()); - resetIdleTimer(); // TODO(ggreenway) PERF: do we need to reset timer on both send and receive? return Network::FilterStatus::StopIteration; } @@ -947,9 +987,25 @@ void Filter::onConnectMaxAttempts() { void Filter::onUpstreamConnection() { connecting_ = false; - // Re-enable downstream reads now that the upstream connection is established - // so we have a place to send downstream data to. - read_callbacks_->connection().readDisable(false); + + // If we have received any data before upstream connection is established, send it to + // the upstream connection. + if (early_data_buffer_.length() > 0) { + // Early data should only happen when receive_before_connect is enabled. + ASSERT(receive_before_connect_); + + ENVOY_CONN_LOG(trace, "TCP:onUpstreamEvent() Flushing early data buffer to upstream", + read_callbacks_->connection()); + getStreamInfo().getUpstreamBytesMeter()->addWireBytesSent(early_data_buffer_.length()); + upstream_->encodeData(early_data_buffer_, early_data_end_stream_); + ASSERT(0 == early_data_buffer_.length()); + + // Re-enable downstream reads now that the early data buffer is flushed. + read_callbacks_->connection().readDisable(false); + } else if (!receive_before_connect_) { + // Re-enable downstream reads now that the upstream connection is established + read_callbacks_->connection().readDisable(false); + } read_callbacks_->upstreamHost()->outlierDetector().putResult( Upstream::Outlier::Result::LocalOriginConnectSuccessFinal); diff --git a/source/common/tcp_proxy/tcp_proxy.h b/source/common/tcp_proxy/tcp_proxy.h index 5dee1944706b..a207afa1e675 100644 --- a/source/common/tcp_proxy/tcp_proxy.h +++ b/source/common/tcp_proxy/tcp_proxy.h @@ -23,6 +23,7 @@ #include "envoy/upstream/cluster_manager.h" #include "envoy/upstream/upstream.h" +#include "source/common/buffer/buffer_impl.h" #include "source/common/common/assert.h" #include "source/common/common/logger.h" #include "source/common/formatter/substitution_format_string.h" @@ -43,6 +44,15 @@ namespace TcpProxy { constexpr absl::string_view PerConnectionIdleTimeoutMs = "envoy.tcp_proxy.per_connection_idle_timeout_ms"; +/** + * ReceiveBeforeConnectKey is the key for the receive_before_connect filter state. The + * filter state value is a ``StreamInfo::BoolAccessor`` indicating whether the + * receive_before_connect functionality should be enabled. Network filters setting this filter + * state should return `StopIteration` in their `onNewConnection` and `onData` methods until they + * have read the data they need before the upstream connection establishment, and only then allow + * the filter chain to proceed to the TCP_PROXY filter. + */ +constexpr absl::string_view ReceiveBeforeConnectKey = "envoy.tcp_proxy.receive_before_connect"; /** * All tcp proxy stats. @see stats_macros.h @@ -54,6 +64,7 @@ constexpr absl::string_view PerConnectionIdleTimeoutMs = COUNTER(downstream_cx_tx_bytes_total) \ COUNTER(downstream_flow_control_paused_reading_total) \ COUNTER(downstream_flow_control_resumed_reading_total) \ + COUNTER(early_data_received_count_total) \ COUNTER(idle_timeout) \ COUNTER(max_downstream_connection_duration) \ COUNTER(upstream_flush_total) \ @@ -665,6 +676,14 @@ class Filter : public Network::ReadFilter, uint32_t connect_attempts_{}; bool connecting_{}; bool downstream_closed_{}; + // Stores the ReceiveBeforeConnect filter state value which can be set by preceding + // filters in the filter chain. When the filter state is set, TCP_PROXY doesn't disable + // downstream read during initialization. This feature can hence be used by preceding filters + // in the filter chain to read data from the downstream connection (for eg: to parse SNI) before + // the upstream connection is established. + bool receive_before_connect_{false}; + bool early_data_end_stream_{false}; + Buffer::OwnedImpl early_data_buffer_{}; HttpStreamDecoderFilterCallbacks upstream_decoder_filter_callbacks_; }; diff --git a/test/common/tcp_proxy/BUILD b/test/common/tcp_proxy/BUILD index 710b1a8aa858..332e0fda1251 100644 --- a/test/common/tcp_proxy/BUILD +++ b/test/common/tcp_proxy/BUILD @@ -24,6 +24,7 @@ envoy_cc_test_library( "//source/common/network:upstream_server_name_lib", "//source/common/network:upstream_socket_options_filter_state_lib", "//source/common/stats:stats_lib", + "//source/common/stream_info:bool_accessor_lib", "//source/common/tcp_proxy", "//source/common/upstream:upstream_includes", "//source/common/upstream:upstream_lib", diff --git a/test/common/tcp_proxy/tcp_proxy_test.cc b/test/common/tcp_proxy/tcp_proxy_test.cc index 02c0b776c644..589d2a55c506 100644 --- a/test/common/tcp_proxy/tcp_proxy_test.cc +++ b/test/common/tcp_proxy/tcp_proxy_test.cc @@ -21,6 +21,7 @@ #include "source/common/network/upstream_socket_options_filter_state.h" #include "source/common/network/win32_redirect_records_option_impl.h" #include "source/common/router/metadatamatchcriteria_impl.h" +#include "source/common/stream_info/bool_accessor_impl.h" #include "source/common/stream_info/uint64_accessor_impl.h" #include "source/common/tcp_proxy/tcp_proxy.h" #include "source/common/upstream/upstream_impl.h" @@ -71,7 +72,7 @@ class TcpProxyTest : public TcpProxyTestBase { })); } using TcpProxyTestBase::setup; - void setup(uint32_t connections, bool set_redirect_records, + void setup(uint32_t connections, bool set_redirect_records, bool receive_before_connect, const envoy::extensions::filters::network::tcp_proxy::v3::TcpProxy& config) override { if (config.has_on_demand()) { EXPECT_CALL(factory_context_.server_factory_context_.cluster_manager_, @@ -142,17 +143,31 @@ class TcpProxyTest : public TcpProxyTestBase { ->addOption( Network::SocketOptionFactory::buildWFPRedirectRecordsOptions(*redirect_records)); } + + filter_callbacks_.connection().streamInfo().filterState()->setData( + TcpProxy::ReceiveBeforeConnectKey, + std::make_unique(receive_before_connect), + StreamInfo::FilterState::StateType::ReadOnly, + StreamInfo::FilterState::LifeSpan::Connection); + filter_ = std::make_unique(config_, factory_context_.server_factory_context_.cluster_manager_); EXPECT_CALL(filter_callbacks_.connection_, enableHalfClose(true)); - EXPECT_CALL(filter_callbacks_.connection_, readDisable(true)); + + if (!receive_before_connect) { + EXPECT_CALL(filter_callbacks_.connection_, readDisable(true)); + } + filter_->initializeReadFilterCallbacks(filter_callbacks_); filter_callbacks_.connection_.stream_info_.downstream_connection_info_provider_ ->setSslConnection(filter_callbacks_.connection_.ssl()); } if (connections > 0) { - EXPECT_EQ(Network::FilterStatus::StopIteration, filter_->onNewConnection()); + auto expected_status_on_new_connection = receive_before_connect + ? Network::FilterStatus::Continue + : Network::FilterStatus::StopIteration; + EXPECT_EQ(expected_status_on_new_connection, filter_->onNewConnection()); EXPECT_EQ(absl::optional(), filter_->computeHashKey()); EXPECT_EQ(&filter_callbacks_.connection_, filter_->downstreamConnection()); EXPECT_EQ(nullptr, filter_->metadataMatchCriteria()); @@ -735,6 +750,82 @@ TEST_P(TcpProxyTest, UpstreamDisconnectDownstreamFlowControl) { filter_callbacks_.connection_.runLowWatermarkCallbacks(); } +TEST_P(TcpProxyTest, ReceiveBeforeConnectBuffersOnEarlyData) { + setup(/*connections=*/1, /*set_redirect_records=*/false, /*receive_before_connect=*/true); + std::string early_data("early data"); + Buffer::OwnedImpl early_data_buffer(early_data); + + // Check that the early data is buffered and flushed to upstream when connection is established. + // Also check that downstream connection is read disabled. + EXPECT_CALL(*upstream_connections_.at(0), write(_, _)).Times(0); + EXPECT_CALL(filter_callbacks_.connection_, readDisable(true)); + filter_->onData(early_data_buffer, /*end_stream=*/false); + + // Now when upstream connection is established, early buffer will be sent. + EXPECT_CALL(*upstream_connections_.at(0), write(BufferStringEqual(early_data), false)); + raiseEventUpstreamConnected(/*conn_index=*/0); + + // Any further communications between client and server can resume normally. + Buffer::OwnedImpl buffer("hello"); + EXPECT_CALL(*upstream_connections_.at(0), write(BufferEqual(&buffer), _)); + filter_->onData(buffer, false); + + Buffer::OwnedImpl response("world"); + EXPECT_CALL(filter_callbacks_.connection_, write(BufferEqual(&response), _)); + upstream_callbacks_->onUpstreamData(response, false); +} + +TEST_P(TcpProxyTest, ReceiveBeforeConnectEarlyDataWithEndStream) { + setup(/*connections=*/1, /*set_redirect_records=*/false, /*receive_before_connect=*/true); + std::string early_data("early data"); + Buffer::OwnedImpl early_data_buffer(early_data); + + // Early data is sent and downstream connection has indicated end of stream. + EXPECT_CALL(*upstream_connections_.at(0), write(_, _)).Times(0); + EXPECT_CALL(filter_callbacks_.connection_, readDisable(true)); + filter_->onData(early_data_buffer, /*end_stream=*/true); + + // Now when upstream connection is established, early buffer will be sent. + EXPECT_CALL(*upstream_connections_.at(0), + write(BufferStringEqual(early_data), /*end_stream*/ true)); + raiseEventUpstreamConnected(/*conn_index=*/0); + + // Any further communications between client and server can resume normally. + Buffer::OwnedImpl response("hello"); + EXPECT_CALL(filter_callbacks_.connection_, write(BufferEqual(&response), _)); + upstream_callbacks_->onUpstreamData(response, false); +} + +TEST_P(TcpProxyTest, ReceiveBeforeConnectNoEarlyData) { + setup(1, /*set_redirect_records=*/false, /*receive_before_connect=*/true); + raiseEventUpstreamConnected(/*conn_index=*/0, /*expect_read_enable=*/false); + + // Any data sent after upstream connection is established is flushed directly to upstream, + // and downstream connection is not read disabled. + Buffer::OwnedImpl buffer("hello"); + EXPECT_CALL(filter_callbacks_.connection_, readDisable(_)).Times(0); + EXPECT_CALL(*upstream_connections_.at(0), write(BufferEqual(&buffer), _)); + filter_->onData(buffer, /*end_stream=*/false); + + Buffer::OwnedImpl response("world"); + EXPECT_CALL(filter_callbacks_.connection_, write(BufferEqual(&response), _)); + upstream_callbacks_->onUpstreamData(response, false); +} + +TEST_P(TcpProxyTest, ReceiveBeforeConnectSetToFalse) { + setup(1, /*set_redirect_records=*/false, /*receive_before_connect=*/false); + raiseEventUpstreamConnected(/*conn_index=*/0, /*expect_read_enable=*/true); + + // Any further communications between client and server can resume normally. + Buffer::OwnedImpl buffer("hello"); + EXPECT_CALL(*upstream_connections_.at(0), write(BufferEqual(&buffer), _)); + filter_->onData(buffer, false); + + Buffer::OwnedImpl response("world"); + EXPECT_CALL(filter_callbacks_.connection_, write(BufferEqual(&response), _)); + upstream_callbacks_->onUpstreamData(response, false); +} + TEST_P(TcpProxyTest, DownstreamDisconnectRemote) { setup(1); @@ -1642,7 +1733,7 @@ TEST_P(TcpProxyTest, UpstreamSocketOptionsReturnedEmpty) { } TEST_P(TcpProxyTest, TcpProxySetRedirectRecordsToUpstream) { - setup(1, true); + setup(/*connections=*/1, /*set_redirect_records=*/true, /*receive_before_connect=*/false); EXPECT_TRUE(filter_->upstreamSocketOptions()); auto iterator = std::find_if( filter_->upstreamSocketOptions()->begin(), filter_->upstreamSocketOptions()->end(), diff --git a/test/common/tcp_proxy/tcp_proxy_test_base.h b/test/common/tcp_proxy/tcp_proxy_test_base.h index a7a7770d218a..c5c0e7820a67 100644 --- a/test/common/tcp_proxy/tcp_proxy_test_base.h +++ b/test/common/tcp_proxy/tcp_proxy_test_base.h @@ -105,23 +105,28 @@ class TcpProxyTestBase : public testing::TestWithParam { return config; } - void setup(uint32_t connections) { setup(connections, false, defaultConfig()); } + void setup(uint32_t connections) { + setup(connections, /*set_redirect_records=*/false, /*receive_before_connect=*/false, + defaultConfig()); + } void setup(uint32_t connections, const envoy::extensions::filters::network::tcp_proxy::v3::TcpProxy& config) { - setup(connections, false, config); + setup(connections, /*set_redirect_records=*/false, /*receive_before_connect=*/false, config); } - void setup(uint32_t connections, bool set_redirect_records) { - setup(connections, set_redirect_records, defaultConfig()); + void setup(uint32_t connections, bool set_redirect_records, bool receive_before_connect) { + setup(connections, set_redirect_records, receive_before_connect, defaultConfig()); } virtual void - setup(uint32_t connections, bool set_redirect_records, + setup(uint32_t connections, bool set_redirect_records, bool receive_before_connect, const envoy::extensions::filters::network::tcp_proxy::v3::TcpProxy& config) PURE; - void raiseEventUpstreamConnected(uint32_t conn_index) { - EXPECT_CALL(filter_callbacks_.connection_, readDisable(false)); + void raiseEventUpstreamConnected(uint32_t conn_index, bool expect_read_enable = true) { + if (expect_read_enable) { + EXPECT_CALL(filter_callbacks_.connection_, readDisable(false)); + } EXPECT_CALL(*upstream_connection_data_.at(conn_index), addUpstreamCallbacks(_)) .WillOnce(Invoke([=, this](Tcp::ConnectionPool::UpstreamCallbacks& cb) -> void { upstream_callbacks_ = &cb; diff --git a/test/integration/BUILD b/test/integration/BUILD index 098561c65c66..f79e364703e0 100644 --- a/test/integration/BUILD +++ b/test/integration/BUILD @@ -1888,6 +1888,7 @@ envoy_cc_test( "//source/common/event:dispatcher_includes", "//source/common/event:dispatcher_lib", "//source/common/network:utility_lib", + "//source/common/stream_info:bool_accessor_lib", "//source/common/tls:context_config_lib", "//source/common/tls:context_lib", "//source/extensions/access_loggers/file:config", diff --git a/test/integration/tcp_proxy_integration_test.cc b/test/integration/tcp_proxy_integration_test.cc index 503b2eb4d64d..22e3edf61510 100644 --- a/test/integration/tcp_proxy_integration_test.cc +++ b/test/integration/tcp_proxy_integration_test.cc @@ -12,6 +12,8 @@ #include "source/common/config/api_version.h" #include "source/common/network/utility.h" +#include "source/common/stream_info/bool_accessor_impl.h" +#include "source/common/tcp_proxy/tcp_proxy.h" #include "source/common/tls/context_manager_impl.h" #include "source/extensions/filters/network/common/factory_base.h" @@ -1716,4 +1718,177 @@ INSTANTIATE_TEST_SUITE_P(TcpProxyIntegrationTestParams, MysqlIntegrationTest, testing::ValuesIn(TestEnvironment::getIpVersionsForTest()), TestUtility::ipTestParamsToString); +class PauseFilter : public Network::ReadFilter { +public: + explicit PauseFilter(int data_size_before_continue) + : data_size_before_continue_(data_size_before_continue) {} + + Network::FilterStatus onData(Buffer::Instance& buffer, bool) override { + // Once the initial buffer size is reached, the filter chain will be continued. + should_continue_ = should_continue_ || buffer.length() >= data_size_before_continue_; + return should_continue_ ? Network::FilterStatus::Continue + : Network::FilterStatus::StopIteration; + } + + Network::FilterStatus onNewConnection() override { + // Stop Iteration as more data is needed before filter chain can be continued. + return Network::FilterStatus::StopIteration; + } + + void initializeReadFilterCallbacks(Network::ReadFilterCallbacks& callbacks) override { + read_callbacks_ = &callbacks; + + // Pass ReceiveBeforeConnect state to TCP_PROXY so that it does not read disable + // the connection. + read_callbacks_->connection().streamInfo().filterState()->setData( + TcpProxy::ReceiveBeforeConnectKey, std::make_unique(true), + StreamInfo::FilterState::StateType::ReadOnly, + StreamInfo::FilterState::LifeSpan::Connection); + } + + // Whether the filter chain should be continued. + bool should_continue_{false}; + // The number of bytes to receive before the filter chain is continued. + uint64_t data_size_before_continue_{}; + Network::ReadFilterCallbacks* read_callbacks_{}; +}; + +class PauseFilterFactory : public Extensions::NetworkFilters::Common::FactoryBase< + test::integration::tcp_proxy::PauseFilterConfig> { +public: + PauseFilterFactory() : FactoryBase("test.pause_iteration") {} + +private: + Network::FilterFactoryCb + createFilterFactoryFromProtoTyped(const test::integration::tcp_proxy::PauseFilterConfig& cfg, + Server::Configuration::FactoryContext&) override { + int data_size_before_continue = cfg.data_size_before_continue(); + return [data_size_before_continue = std::move(data_size_before_continue)]( + Network::FilterManager& filter_manager) -> void { + filter_manager.addReadFilter(std::make_shared(data_size_before_continue)); + }; + } +}; + +class TcpProxyReceiveBeforeConnectIntegrationTest : public TcpProxyIntegrationTest { +public: + void addPauseFilter(uint32_t data_size_before_continue) { + config_helper_.addNetworkFilter(fmt::format(R"EOF( + name: test.pause_iteration + typed_config: + "@type": type.googleapis.com/test.integration.tcp_proxy.PauseFilterConfig + data_size_before_continue: {} +)EOF", + data_size_before_continue)); + } + + PauseFilterFactory factory_; + Registry::InjectFactory register_factory_{ + factory_}; +}; + +INSTANTIATE_TEST_SUITE_P(TcpProxyIntegrationTestParams, TcpProxyReceiveBeforeConnectIntegrationTest, + testing::ValuesIn(TestEnvironment::getIpVersionsForTest()), + TestUtility::ipTestParamsToString); + +TEST_P(TcpProxyReceiveBeforeConnectIntegrationTest, ReceiveBeforeConnectEarlyData) { + uint32_t data_size_before_continue = 6; + addPauseFilter(data_size_before_continue); + + initialize(); + FakeRawConnectionPtr fake_upstream_connection; + IntegrationTcpClientPtr tcp_client = makeTcpConnection(lookupPort("tcp_proxy")); + + // Until total data size > 6 is received, the PauseFilter stops the iteration. Downstream counter + // is incremented, but no connection attempt to upstream is made. + ASSERT_TRUE(tcp_client->write("hello")); + test_server_->waitForCounterEq("tcp.tcpproxy_stats.downstream_cx_total", 1); + EXPECT_EQ(0, test_server_->counter("cluster.cluster_0.upstream_cx_total")->value()); + + ASSERT_TRUE(tcp_client->write("world")); + test_server_->waitForCounterEq("tcp.tcpproxy_stats.early_data_received_count_total", 1); + ASSERT_TRUE(fake_upstreams_[0]->waitForRawConnection(fake_upstream_connection)); + EXPECT_EQ(1, test_server_->counter("cluster.cluster_0.upstream_cx_total")->value()); + + // Once the connection is established, the early data will be flushed to the upstream. + ASSERT_TRUE(fake_upstream_connection->waitForData(FakeRawConnection::waitForMatch("helloworld"))); + ASSERT_TRUE(fake_upstream_connection->write("response")); + ASSERT_TRUE(fake_upstream_connection->close()); + ASSERT_TRUE(fake_upstream_connection->waitForDisconnect()); + tcp_client->waitForHalfClose(); + tcp_client->close(); +} + +TEST_P(TcpProxyReceiveBeforeConnectIntegrationTest, UpstreamBufferHighWatermark) { + /* This test validates that no inconsistent state happens when the downstream is read disabled + * twice, once when the early data is received and next when the upstream buffer hits high + * watermark. + */ + const uint32_t data_size_before_continue = 512; + addPauseFilter(data_size_before_continue); + + const uint32_t upstream_buffer_limit = 512; + const uint32_t downstream_buffer_limit = 1024; + const uint32_t data_size = 16 * downstream_buffer_limit; + + config_helper_.setBufferLimits(upstream_buffer_limit, downstream_buffer_limit); + std::string data; + for (uint32_t i = 0; i < data_size / 4; i++) + data += "abcd"; + + initialize(); + FakeRawConnectionPtr fake_upstream_connection; + IntegrationTcpClientPtr tcp_client = makeTcpConnection(lookupPort("tcp_proxy")); + + // PauseFilter stops the iteration until sufficient data is received. + ASSERT_TRUE(tcp_client->write(data.substr(0, upstream_buffer_limit - 1))); + test_server_->waitForCounterEq("tcp.tcpproxy_stats.downstream_cx_total", 1); + EXPECT_EQ(0, test_server_->counter("cluster.cluster_0.upstream_cx_total")->value()); + + // Downstream sends more data. PauseFilter allows the iteration to continue, upstream connection + // is established. The buffered early data is sent to the upstream. + ASSERT_TRUE(tcp_client->write(data.substr(upstream_buffer_limit - 1))); + test_server_->waitForCounterEq("tcp.tcpproxy_stats.early_data_received_count_total", 1); + ASSERT_TRUE(fake_upstreams_[0]->waitForRawConnection(fake_upstream_connection)); + EXPECT_EQ(1, test_server_->counter("cluster.cluster_0.upstream_cx_total")->value()); + + // At this point, the downstream is already read disabled, waiting for early data flush. Another + // downstream read disable is triggered as the early data is pushed to the upstream buffer and the + // buffer hits the high watermark. Downstream read disable counter will be 2. After the early data + // is pushed, downstream is read enabled once. Downstream read disable counter will be 1. After + // the upstream buffer is flushed to the upstream, it comes below the watermark and downstream is + // read enabled. + ASSERT_TRUE(fake_upstream_connection->waitForData(FakeRawConnection::waitForMatch(data.c_str()))); + ASSERT_TRUE(fake_upstream_connection->waitForData(data_size)); + ASSERT_TRUE(fake_upstream_connection->write("response")); + ASSERT_TRUE(fake_upstream_connection->close()); + ASSERT_TRUE(fake_upstream_connection->waitForDisconnect()); + tcp_client->waitForHalfClose(); + tcp_client->close(); + + uint32_t upstream_pauses = + test_server_->counter("cluster.cluster_0.upstream_flow_control_paused_reading_total") + ->value(); + uint32_t upstream_resumes = + test_server_->counter("cluster.cluster_0.upstream_flow_control_resumed_reading_total") + ->value(); + + uint32_t downstream_pauses = + test_server_->counter("tcp.tcpproxy_stats.downstream_flow_control_paused_reading_total") + ->value(); + uint32_t downstream_resumes = + test_server_->counter("tcp.tcpproxy_stats.downstream_flow_control_resumed_reading_total") + ->value(); + + EXPECT_EQ(upstream_pauses, upstream_resumes); + EXPECT_EQ(upstream_resumes, 0); + + // Since we are receiving early data, downstream connection will already be read + // disabled so downstream pause metric is not emitted when upstream buffer hits high + // watermark. When the upstream buffer watermark goes down, downstream will be read + // enabled and downstream resume metric will be emitted. + EXPECT_EQ(downstream_pauses, 0); + EXPECT_EQ(downstream_resumes, 1); +} + } // namespace Envoy diff --git a/test/integration/tcp_proxy_integration_test.proto b/test/integration/tcp_proxy_integration_test.proto index 6167ab80c903..b882572cd9cd 100644 --- a/test/integration/tcp_proxy_integration_test.proto +++ b/test/integration/tcp_proxy_integration_test.proto @@ -5,3 +5,7 @@ package test.integration.tcp_proxy; message InjectDynamicMetadata { string key = 1; } + +message PauseFilterConfig { + uint32 data_size_before_continue = 1; +}