diff --git a/include/envoy/http/filter.h b/include/envoy/http/filter.h index c1d695ae5cde..4a7aacf253ab 100644 --- a/include/envoy/http/filter.h +++ b/include/envoy/http/filter.h @@ -222,6 +222,19 @@ class StreamDecoderFilterCallbacks : public virtual StreamFilterCallbacks { */ virtual void addDecodedData(Buffer::Instance& data, bool streaming_filter) PURE; + /** + * Decode data directly to subsequent filters in the filter chain. This method is used in + * advanced cases in which a filter needs full control over how subsequent filters view data, + * and does not want to make use of HTTP connection manager buffering. Using this method allows + * a filter to buffer data (or not) and then periodically inject data to subsequent filters, + * indicating end_stream at an appropriate time. This can be used to implement rate limiting, + * periodic data emission, etc. + * + * This method should only be called outside of callback context. I.e., do not call this method + * from within a filter's decodeData() call. + */ + virtual void decodeData(Buffer::Instance& data, bool end_stream) PURE; + /** * Adds decoded trailers. May only be called in decodeData when end_stream is set to true. * If called in any other context, an assertion will be triggered. @@ -474,6 +487,19 @@ class StreamEncoderFilterCallbacks : public virtual StreamFilterCallbacks { */ virtual void addEncodedData(Buffer::Instance& data, bool streaming_filter) PURE; + /** + * Encode data directly to subsequent filters in the filter chain. This method is used in + * advanced cases in which a filter needs full control over how subsequent filters view data, + * and does not want to make use of HTTP connection manager buffering. Using this method allows + * a filter to buffer data (or not) and then periodically inject data to subsequent filters, + * indicating end_stream at an appropriate time. This can be used to implement rate limiting, + * periodic data emission, etc. + * + * This method should only be called outside of callback context. I.e., do not call this method + * from within a filter's encodeData() call. + */ + virtual void encodeData(Buffer::Instance& data, bool end_stream) PURE; + /** * Adds encoded trailers. May only be called in encodeData when end_stream is set to true. * If called in any other context, an assertion will be triggered. @@ -517,7 +543,7 @@ class StreamEncoderFilterCallbacks : public virtual StreamFilterCallbacks { */ class StreamEncoderFilter : public StreamFilterBase { public: - /* + /** * Called with 100-continue headers. * * This is not folded into encodeHeaders because most Envoy users and filters diff --git a/source/common/http/async_client_impl.h b/source/common/http/async_client_impl.h index 36d16d66fc8e..17c2f02c24d1 100644 --- a/source/common/http/async_client_impl.h +++ b/source/common/http/async_client_impl.h @@ -311,6 +311,7 @@ class AsyncStreamImpl : public AsyncClient::Stream, // filter which uses this function for buffering. ASSERT(buffered_body_ != nullptr); } + void decodeData(Buffer::Instance&, bool) override { NOT_IMPLEMENTED_GCOVR_EXCL_LINE; } const Buffer::Instance* decodingBuffer() override { return buffered_body_.get(); } void modifyDecodingBuffer(std::function) override { NOT_IMPLEMENTED_GCOVR_EXCL_LINE; diff --git a/source/common/http/conn_manager_impl.cc b/source/common/http/conn_manager_impl.cc index 42951c2c6b84..b84624a9f730 100644 --- a/source/common/http/conn_manager_impl.cc +++ b/source/common/http/conn_manager_impl.cc @@ -1734,6 +1734,11 @@ void ConnectionManagerImpl::ActiveStreamDecoderFilter::addDecodedData(Buffer::In parent_.addDecodedData(*this, data, streaming); } +void ConnectionManagerImpl::ActiveStreamDecoderFilter::decodeData(Buffer::Instance& data, + bool end_stream) { + parent_.decodeData(this, data, end_stream); +} + void ConnectionManagerImpl::ActiveStreamDecoderFilter::continueDecoding() { commonContinue(); } void ConnectionManagerImpl::ActiveStreamDecoderFilter::encode100ContinueHeaders( @@ -1850,6 +1855,11 @@ void ConnectionManagerImpl::ActiveStreamEncoderFilter::addEncodedData(Buffer::In return parent_.addEncodedData(*this, data, streaming); } +void ConnectionManagerImpl::ActiveStreamEncoderFilter::encodeData(Buffer::Instance& data, + bool end_stream) { + parent_.encodeData(this, data, end_stream); +} + HeaderMap& ConnectionManagerImpl::ActiveStreamEncoderFilter::addEncodedTrailers() { return parent_.addEncodedTrailers(); } diff --git a/source/common/http/conn_manager_impl.h b/source/common/http/conn_manager_impl.h index 5c15c9a9711c..08d68ad919bd 100644 --- a/source/common/http/conn_manager_impl.h +++ b/source/common/http/conn_manager_impl.h @@ -171,6 +171,7 @@ class ConnectionManagerImpl : Logger::Loggable, // Http::StreamDecoderFilterCallbacks void addDecodedData(Buffer::Instance& data, bool streaming) override; + void decodeData(Buffer::Instance& data, bool end_stream) override; HeaderMap& addDecodedTrailers() override; void continueDecoding() override; const Buffer::Instance* decodingBuffer() override { @@ -253,6 +254,7 @@ class ConnectionManagerImpl : Logger::Loggable, // Http::StreamEncoderFilterCallbacks void addEncodedData(Buffer::Instance& data, bool streaming) override; + void encodeData(Buffer::Instance& data, bool end_stream) override; HeaderMap& addEncodedTrailers() override; void onEncoderFilterAboveWriteBufferHighWatermark() override; void onEncoderFilterBelowWriteBufferLowWatermark() override; diff --git a/test/common/http/conn_manager_impl_test.cc b/test/common/http/conn_manager_impl_test.cc index 9c9e90a9de79..536426371942 100644 --- a/test/common/http/conn_manager_impl_test.cc +++ b/test/common/http/conn_manager_impl_test.cc @@ -3422,6 +3422,180 @@ TEST_F(HttpConnectionManagerImplTest, AddDataWithStopAndContinue) { encoder_filters_[2]->callbacks_->continueEncoding(); } +// Use filter direct decode/encodeData() calls without trailers. +TEST_F(HttpConnectionManagerImplTest, FilterDirectDecodeEncodeDataNoTrailers) { + InSequence s; + setup(false, ""); + + EXPECT_CALL(*codec_, dispatch(_)).WillOnce(Invoke([&](Buffer::Instance&) -> void { + StreamDecoder* decoder = &conn_manager_->newStream(response_encoder_); + HeaderMapPtr headers{ + new TestHeaderMapImpl{{":authority", "host"}, {":path", "/"}, {":method", "GET"}}}; + decoder->decodeHeaders(std::move(headers), false); + + Buffer::OwnedImpl fake_data("hello"); + decoder->decodeData(fake_data, true); + })); + + EXPECT_CALL(*route_config_provider_.route_config_, route(_, _)); + setupFilterChain(2, 2); + + EXPECT_CALL(*decoder_filters_[0], decodeHeaders(_, false)) + .WillOnce(Return(FilterHeadersStatus::Continue)); + EXPECT_CALL(*decoder_filters_[1], decodeHeaders(_, false)) + .WillOnce(Return(FilterHeadersStatus::StopIteration)); + + Buffer::OwnedImpl decode_buffer; + EXPECT_CALL(*decoder_filters_[0], decodeData(_, true)) + .WillOnce(Invoke([&](Buffer::Instance& data, bool) { + decode_buffer.move(data); + return FilterDataStatus::StopIterationNoBuffer; + })); + EXPECT_CALL(*decoder_filters_[0], decodeComplete()); + + // Kick off the incoming data. + Buffer::OwnedImpl fake_input("1234"); + conn_manager_->onData(fake_input, false); + + Buffer::OwnedImpl decoded_data_to_forward; + decoded_data_to_forward.move(decode_buffer, 2); + EXPECT_CALL(*decoder_filters_[1], decodeData(BufferStringEqual("he"), false)) + .WillOnce(Return(FilterDataStatus::StopIterationNoBuffer)); + decoder_filters_[0]->callbacks_->decodeData(decoded_data_to_forward, false); + + EXPECT_CALL(*decoder_filters_[1], decodeData(BufferStringEqual("llo"), true)) + .WillOnce(Return(FilterDataStatus::StopIterationNoBuffer)); + EXPECT_CALL(*decoder_filters_[1], decodeComplete()); + decoder_filters_[0]->callbacks_->decodeData(decode_buffer, true); + + // Response path. + EXPECT_CALL(*encoder_filters_[1], encodeHeaders(_, false)) + .WillOnce(Return(FilterHeadersStatus::Continue)); + EXPECT_CALL(*encoder_filters_[0], encodeHeaders(_, false)) + .WillOnce(Return(FilterHeadersStatus::Continue)); + EXPECT_CALL(response_encoder_, encodeHeaders(_, false)); + + Buffer::OwnedImpl encoder_buffer; + EXPECT_CALL(*encoder_filters_[1], encodeData(_, true)) + .WillOnce(Invoke([&](Buffer::Instance& data, bool) { + encoder_buffer.move(data); + return FilterDataStatus::StopIterationNoBuffer; + })); + EXPECT_CALL(*encoder_filters_[1], encodeComplete()); + + decoder_filters_[1]->callbacks_->encodeHeaders( + HeaderMapPtr{new TestHeaderMapImpl{{":status", "200"}}}, false); + Buffer::OwnedImpl response_body("response"); + decoder_filters_[1]->callbacks_->encodeData(response_body, true); + + Buffer::OwnedImpl encoded_data_to_forward; + encoded_data_to_forward.move(encoder_buffer, 3); + EXPECT_CALL(*encoder_filters_[0], encodeData(BufferStringEqual("res"), false)); + EXPECT_CALL(response_encoder_, encodeData(_, false)); + encoder_filters_[1]->callbacks_->encodeData(encoded_data_to_forward, false); + + EXPECT_CALL(*encoder_filters_[0], encodeData(BufferStringEqual("ponse"), true)); + EXPECT_CALL(*encoder_filters_[0], encodeComplete()); + EXPECT_CALL(response_encoder_, encodeData(_, true)); + expectOnDestroy(); + encoder_filters_[1]->callbacks_->encodeData(encoder_buffer, true); +} + +// Use filter direct decode/encodeData() calls with trailers. +TEST_F(HttpConnectionManagerImplTest, FilterDirectDecodeEncodeDataTrailers) { + InSequence s; + setup(false, ""); + + EXPECT_CALL(*codec_, dispatch(_)).WillOnce(Invoke([&](Buffer::Instance&) -> void { + StreamDecoder* decoder = &conn_manager_->newStream(response_encoder_); + HeaderMapPtr headers{ + new TestHeaderMapImpl{{":authority", "host"}, {":path", "/"}, {":method", "GET"}}}; + decoder->decodeHeaders(std::move(headers), false); + + Buffer::OwnedImpl fake_data("hello"); + decoder->decodeData(fake_data, false); + + HeaderMapPtr trailers{new TestHeaderMapImpl{{"foo", "bar"}}}; + decoder->decodeTrailers(std::move(trailers)); + })); + + EXPECT_CALL(*route_config_provider_.route_config_, route(_, _)); + setupFilterChain(2, 2); + + EXPECT_CALL(*decoder_filters_[0], decodeHeaders(_, false)) + .WillOnce(Return(FilterHeadersStatus::Continue)); + EXPECT_CALL(*decoder_filters_[1], decodeHeaders(_, false)) + .WillOnce(Return(FilterHeadersStatus::StopIteration)); + + Buffer::OwnedImpl decode_buffer; + EXPECT_CALL(*decoder_filters_[0], decodeData(_, false)) + .WillOnce(Invoke([&](Buffer::Instance& data, bool) { + decode_buffer.move(data); + return FilterDataStatus::StopIterationNoBuffer; + })); + EXPECT_CALL(*decoder_filters_[0], decodeTrailers(_)) + .WillOnce(Return(FilterTrailersStatus::StopIteration)); + EXPECT_CALL(*decoder_filters_[0], decodeComplete()); + + // Kick off the incoming data. + Buffer::OwnedImpl fake_input("1234"); + conn_manager_->onData(fake_input, false); + + Buffer::OwnedImpl decoded_data_to_forward; + decoded_data_to_forward.move(decode_buffer, 2); + EXPECT_CALL(*decoder_filters_[1], decodeData(BufferStringEqual("he"), false)) + .WillOnce(Return(FilterDataStatus::StopIterationNoBuffer)); + decoder_filters_[0]->callbacks_->decodeData(decoded_data_to_forward, false); + + EXPECT_CALL(*decoder_filters_[1], decodeData(BufferStringEqual("llo"), false)) + .WillOnce(Return(FilterDataStatus::StopIterationNoBuffer)); + decoder_filters_[0]->callbacks_->decodeData(decode_buffer, false); + + EXPECT_CALL(*decoder_filters_[1], decodeTrailers(_)); + EXPECT_CALL(*decoder_filters_[1], decodeComplete()); + decoder_filters_[0]->callbacks_->continueDecoding(); + + // Response path. + EXPECT_CALL(*encoder_filters_[1], encodeHeaders(_, false)) + .WillOnce(Return(FilterHeadersStatus::Continue)); + EXPECT_CALL(*encoder_filters_[0], encodeHeaders(_, false)) + .WillOnce(Return(FilterHeadersStatus::Continue)); + EXPECT_CALL(response_encoder_, encodeHeaders(_, false)); + + Buffer::OwnedImpl encoder_buffer; + EXPECT_CALL(*encoder_filters_[1], encodeData(_, false)) + .WillOnce(Invoke([&](Buffer::Instance& data, bool) { + encoder_buffer.move(data); + return FilterDataStatus::StopIterationNoBuffer; + })); + EXPECT_CALL(*encoder_filters_[1], encodeTrailers(_)) + .WillOnce(Return(FilterTrailersStatus::StopIteration)); + EXPECT_CALL(*encoder_filters_[1], encodeComplete()); + + decoder_filters_[1]->callbacks_->encodeHeaders( + HeaderMapPtr{new TestHeaderMapImpl{{":status", "200"}}}, false); + Buffer::OwnedImpl response_body("response"); + decoder_filters_[1]->callbacks_->encodeData(response_body, false); + decoder_filters_[1]->callbacks_->encodeTrailers( + HeaderMapPtr{new TestHeaderMapImpl{{":status", "200"}}}); + + Buffer::OwnedImpl encoded_data_to_forward; + encoded_data_to_forward.move(encoder_buffer, 3); + EXPECT_CALL(*encoder_filters_[0], encodeData(BufferStringEqual("res"), false)); + EXPECT_CALL(response_encoder_, encodeData(_, false)); + encoder_filters_[1]->callbacks_->encodeData(encoded_data_to_forward, false); + + EXPECT_CALL(*encoder_filters_[0], encodeData(BufferStringEqual("ponse"), false)); + EXPECT_CALL(response_encoder_, encodeData(_, false)); + encoder_filters_[1]->callbacks_->encodeData(encoder_buffer, false); + + EXPECT_CALL(*encoder_filters_[0], encodeTrailers(_)); + EXPECT_CALL(*encoder_filters_[0], encodeComplete()); + EXPECT_CALL(response_encoder_, encodeTrailers(_)); + expectOnDestroy(); + encoder_filters_[1]->callbacks_->continueEncoding(); +} + TEST_F(HttpConnectionManagerImplTest, MultipleFilters) { InSequence s; setup(false, ""); diff --git a/test/mocks/http/mocks.h b/test/mocks/http/mocks.h index e472d4fba02b..69d5c7c0895e 100644 --- a/test/mocks/http/mocks.h +++ b/test/mocks/http/mocks.h @@ -177,6 +177,7 @@ class MockStreamDecoderFilterCallbacks : public StreamDecoderFilterCallbacks, MOCK_METHOD0(continueDecoding, void()); MOCK_METHOD2(addDecodedData, void(Buffer::Instance& data, bool streaming)); + MOCK_METHOD2(decodeData, void(Buffer::Instance& data, bool end_stream)); MOCK_METHOD0(addDecodedTrailers, HeaderMap&()); MOCK_METHOD0(decodingBuffer, const Buffer::Instance*()); MOCK_METHOD1(modifyDecodingBuffer, void(std::function)); @@ -219,6 +220,7 @@ class MockStreamEncoderFilterCallbacks : public StreamEncoderFilterCallbacks, // Http::StreamEncoderFilterCallbacks MOCK_METHOD2(addEncodedData, void(Buffer::Instance& data, bool streaming)); + MOCK_METHOD2(encodeData, void(Buffer::Instance& data, bool end_stream)); MOCK_METHOD0(addEncodedTrailers, HeaderMap&()); MOCK_METHOD0(continueEncoding, void()); MOCK_METHOD0(encodingBuffer, const Buffer::Instance*());