-
Notifications
You must be signed in to change notification settings - Fork 4.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
http: allow decoding/encoding a header only request/response #4885
Changes from 5 commits
ed6b15d
259cde2
7ea1664
c8fb131
0122b27
5eb4cd9
1575210
3f4d517
c104aaa
cd98729
4a82d5a
1ed28a7
d5d98c7
63b1847
ad59360
b893e80
f18361e
fd5f701
0c92e8b
24f2fec
82d0c44
6c0ea6b
e36e993
5f3ff84
15d0c71
ec72f47
d0a771b
ddce29e
36c1f08
7c14b69
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -27,7 +27,11 @@ enum class FilterHeadersStatus { | |
// Do not iterate to any of the remaining filters in the chain. Returning | ||
// FilterDataStatus::Continue from decodeData()/encodeData() or calling | ||
// continueDecoding()/continueEncoding() MUST be called if continued filter iteration is desired. | ||
StopIteration | ||
StopIteration, | ||
// Continue iteration to remaining filters, but ignore and subsequent data or trailers. This | ||
// results | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: reflow comment |
||
// in creating a header only request/response. | ||
ContinueHeadersOnly | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I would prefer There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Name change that @lizan proposed SGTM. |
||
}; | ||
|
||
/** | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -721,11 +721,13 @@ void ConnectionManagerImpl::ActiveStream::decodeHeaders(ActiveStreamDecoderFilte | |
ASSERT(!(state_.filter_call_state_ & FilterCallState::DecodeHeaders)); | ||
state_.filter_call_state_ |= FilterCallState::DecodeHeaders; | ||
FilterHeadersStatus status = (*entry)->decodeHeaders( | ||
headers, end_stream && continue_data_entry == decoder_filters_.end()); | ||
headers, | ||
decoding_headers_only_ || (end_stream && continue_data_entry == decoder_filters_.end())); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we want to allow There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. +1 on ASSERTing this. |
||
state_.filter_call_state_ &= ~FilterCallState::DecodeHeaders; | ||
ENVOY_STREAM_LOG(trace, "decode headers called: filter={} status={}", *this, | ||
static_cast<const void*>((*entry).get()), static_cast<uint64_t>(status)); | ||
if (!(*entry)->commonHandleAfterHeadersCallback(status) && | ||
|
||
if (!(*entry)->commonHandleAfterHeadersCallback(status, decoding_headers_only_) && | ||
std::next(entry) != decoder_filters_.end()) { | ||
// Stop iteration IFF this is not the last filter. If it is the last filter, continue with | ||
// processing since we need to handle the case where a terminal filter wants to buffer, but | ||
|
@@ -758,6 +760,11 @@ void ConnectionManagerImpl::ActiveStream::decodeData(Buffer::Instance& data, boo | |
|
||
void ConnectionManagerImpl::ActiveStream::decodeData(ActiveStreamDecoderFilter* filter, | ||
Buffer::Instance& data, bool end_stream) { | ||
// If we previously decided to decode only the headers, do nothing here. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I wonder if we should do this after resetting idle timer below? I have a more general question which I would need to do some refreshing to answer, but in the case where we switch a request/response to header only, and then "finish" the request/response, do we correct reset a downstream/upstream stream that we hasn't finished yet? From a very quick look at the router and HCM, I think we do, but could you check this also to see what you think? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is probably because I'm not familiar enough with the stream semantics, but what do you mean by reset in this case? Are you talking about making sure the stream is properly cleaned up after we do this transformation? Or is there some kind of explicit reset action that you're expecting to happen here? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, exactly, that proper cleanup happens in all cases when we translate the request/response to headers only but we have data/trailers still coming. I'm not convinced this is completely correct. I need to spend a little time looking at code which I will do later today or tomorrow and then I can help explain my thinking better. Sorry for the delay. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I spent some time looking through the code because I also have similar concerns to @alyssawilk that we might not be handling dropping body/trailer data correctly in all the different permutations. @snowp for reference this is all the calls that ultimately end up calling With that said, I think having integration tests here that cover the various permutations are very important. So an example important integration test looks something like like:
This should result in a downstream response followed by a reset for HTTP/2 or a connection close for HTTP/1. @snowp sorry this is pretty complicated. LMK if this is enough to go on. Also, I do think the returns have to be after we reset the stream idle timer in all cases since I think we should count body that we are dropping as not idle. Can we test that also, unit test is fine. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. After the writing the integration tests it seems like it is reseting the stream but there's a segfault happening for HTTP/1 when we try to access the response encoder to reset it in https://github.com/envoyproxy/envoy/pull/4885/files#diff-240648b2ff3a8c5e19e5c4f57d079c2eR1711. I was able to get the test to go green by by avoiding the reset by setting local_complete, but I'm not sure if that's the right thing to do There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ping on this I still think we want to return below
Sorry where are we talking about? |
||
if (decoding_headers_only_) { | ||
return; | ||
} | ||
|
||
resetIdleTimer(); | ||
|
||
// If a response is complete or a reset has been sent, filters do not care about further body | ||
|
@@ -855,6 +862,11 @@ void ConnectionManagerImpl::ActiveStream::decodeTrailers(HeaderMapPtr&& trailers | |
|
||
void ConnectionManagerImpl::ActiveStream::decodeTrailers(ActiveStreamDecoderFilter* filter, | ||
HeaderMap& trailers) { | ||
// If we previously decided to decode only the headers, do nothing here. | ||
if (decoding_headers_only_) { | ||
return; | ||
} | ||
|
||
// See decodeData() above for why we check local_complete_ here. | ||
if (state_.local_complete_) { | ||
return; | ||
|
@@ -997,11 +1009,12 @@ void ConnectionManagerImpl::ActiveStream::encodeHeaders(ActiveStreamEncoderFilte | |
ASSERT(!(state_.filter_call_state_ & FilterCallState::EncodeHeaders)); | ||
state_.filter_call_state_ |= FilterCallState::EncodeHeaders; | ||
FilterHeadersStatus status = (*entry)->handle_->encodeHeaders( | ||
headers, end_stream && continue_data_entry == encoder_filters_.end()); | ||
headers, | ||
encoding_headers_only_ || (end_stream && continue_data_entry == encoder_filters_.end())); | ||
state_.filter_call_state_ &= ~FilterCallState::EncodeHeaders; | ||
ENVOY_STREAM_LOG(trace, "encode headers called: filter={} status={}", *this, | ||
static_cast<const void*>((*entry).get()), static_cast<uint64_t>(status)); | ||
if (!(*entry)->commonHandleAfterHeadersCallback(status)) { | ||
if (!(*entry)->commonHandleAfterHeadersCallback(status, encoding_headers_only_)) { | ||
return; | ||
} | ||
|
||
|
@@ -1093,12 +1106,15 @@ void ConnectionManagerImpl::ActiveStream::encodeHeaders(ActiveStreamEncoderFilte | |
chargeStats(headers); | ||
|
||
ENVOY_STREAM_LOG(debug, "encoding headers via codec (end_stream={}):\n{}", *this, | ||
end_stream && continue_data_entry == encoder_filters_.end(), headers); | ||
encoding_headers_only_ || | ||
(end_stream && continue_data_entry == encoder_filters_.end()), | ||
headers); | ||
|
||
// Now actually encode via the codec. | ||
stream_info_.onFirstDownstreamTxByteSent(); | ||
response_encoder_->encodeHeaders(headers, | ||
end_stream && continue_data_entry == encoder_filters_.end()); | ||
response_encoder_->encodeHeaders( | ||
headers, | ||
encoding_headers_only_ || (end_stream && continue_data_entry == encoder_filters_.end())); | ||
|
||
if (continue_data_entry != encoder_filters_.end()) { | ||
// We use the continueEncoding() code since it will correctly handle not calling | ||
|
@@ -1107,7 +1123,7 @@ void ConnectionManagerImpl::ActiveStream::encodeHeaders(ActiveStreamEncoderFilte | |
(*continue_data_entry)->stopped_ = true; | ||
(*continue_data_entry)->continueEncoding(); | ||
} else { | ||
maybeEndEncode(end_stream); | ||
maybeEndEncode(encoding_headers_only_ || end_stream); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. are we handling maybeEndEncode and maybeEndDecode the same for the header-only case? If not, maybe worth a comment why we treat them differently? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we're handling them differently just because they're called at different locations: maybeEndDecode is called before filters are evaluated, while maybeEndEncode is called after. I'll go over this and add some comments to make this easier to understand Actually this makes me think we might not be calling endDecode properly, will investigate |
||
} | ||
} | ||
|
||
|
@@ -1145,6 +1161,11 @@ void ConnectionManagerImpl::ActiveStream::addEncodedData(ActiveStreamEncoderFilt | |
|
||
void ConnectionManagerImpl::ActiveStream::encodeData(ActiveStreamEncoderFilter* filter, | ||
Buffer::Instance& data, bool end_stream) { | ||
// If we previously decided to encode only the headers, do nothing here. | ||
if (encoding_headers_only_) { | ||
return; | ||
} | ||
|
||
resetIdleTimer(); | ||
std::list<ActiveStreamEncoderFilterPtr>::iterator entry = commonEncodePrefix(filter, end_stream); | ||
auto trailers_added_entry = encoder_filters_.end(); | ||
|
@@ -1197,6 +1218,11 @@ void ConnectionManagerImpl::ActiveStream::encodeData(ActiveStreamEncoderFilter* | |
|
||
void ConnectionManagerImpl::ActiveStream::encodeTrailers(ActiveStreamEncoderFilter* filter, | ||
HeaderMap& trailers) { | ||
// If we previously decided to encode only the headers, do nothing here. | ||
if (encoding_headers_only_) { | ||
return; | ||
} | ||
|
||
resetIdleTimer(); | ||
std::list<ActiveStreamEncoderFilterPtr>::iterator entry = commonEncodePrefix(filter, true); | ||
for (; entry != encoder_filters_.end(); entry++) { | ||
|
@@ -1360,13 +1386,18 @@ bool ConnectionManagerImpl::ActiveStreamFilterBase::commonHandleAfter100Continue | |
} | ||
|
||
bool ConnectionManagerImpl::ActiveStreamFilterBase::commonHandleAfterHeadersCallback( | ||
FilterHeadersStatus status) { | ||
FilterHeadersStatus status, bool& headers_only) { | ||
ASSERT(!headers_continued_); | ||
ASSERT(!stopped_); | ||
|
||
if (status == FilterHeadersStatus::StopIteration) { | ||
stopped_ = true; | ||
return false; | ||
} else if (status == FilterHeadersStatus::ContinueHeadersOnly) { | ||
// Set headers_only to true so we know to end early if necessary, | ||
// but continue filter iteration so we actually write the headers/run the cleanup code. | ||
headers_only = true; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Consider a debug log here - we're pretty consistent about logging headers/body/trailers so logging that we'll be swallowing them is probably helpful for debugging |
||
return true; | ||
} else { | ||
ASSERT(status == FilterHeadersStatus::Continue); | ||
headers_continued_ = true; | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -97,7 +97,7 @@ class ConnectionManagerImpl : Logger::Loggable<Logger::Id::http>, | |
stopped_(false), dual_filter_(dual_filter) {} | ||
|
||
bool commonHandleAfter100ContinueHeadersCallback(FilterHeadersStatus status); | ||
bool commonHandleAfterHeadersCallback(FilterHeadersStatus status); | ||
bool commonHandleAfterHeadersCallback(FilterHeadersStatus status, bool& headers_only); | ||
void commonHandleBufferData(Buffer::Instance& provided_data); | ||
bool commonHandleAfterDataCallback(FilterDataStatus status, Buffer::Instance& provided_data, | ||
bool& buffer_was_streaming); | ||
|
@@ -409,6 +409,8 @@ class ConnectionManagerImpl : Logger::Loggable<Logger::Id::http>, | |
// is ever called, this is set to true so commonContinue resumes processing the 100-Continue. | ||
bool has_continue_headers_{}; | ||
bool is_head_request_{false}; | ||
bool decoding_headers_only_{}; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: Can you normalize initialization of There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. Maybe it would be possible to add a linter check to check that we're being consistent with initializing members to the default value? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. would you mind commenting what these are used for and when they are set? |
||
bool encoding_headers_only_{}; | ||
}; | ||
|
||
typedef std::unique_ptr<ActiveStream> ActiveStreamPtr; | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2701,6 +2701,132 @@ TEST_F(HttpConnectionManagerImplTest, FilterHeadReply) { | |
conn_manager_->onData(fake_input, false); | ||
} | ||
|
||
TEST_F(HttpConnectionManagerImplTest, FilterContinueHeadersOnlyHeaders) { | ||
InSequence s; | ||
setup(false, ""); | ||
|
||
EXPECT_CALL(*codec_, dispatch(_)).WillOnce(Invoke([&](Buffer::Instance&) -> void { | ||
StreamDecoder* decoder = &conn_manager_->newStream(response_encoder_); | ||
HeaderMapPtr headers{ | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. make_unique wherever possible |
||
new TestHeaderMapImpl{{":authority", "host"}, {":path", "/"}, {":method", "GET"}}}; | ||
decoder->decodeHeaders(std::move(headers), true); | ||
})); | ||
|
||
setupFilterChain(2, 2); | ||
|
||
EXPECT_CALL(*decoder_filters_[0], decodeHeaders(_, true)) | ||
.WillOnce(InvokeWithoutArgs( | ||
[&]() -> FilterHeadersStatus { return FilterHeadersStatus::ContinueHeadersOnly; })); | ||
EXPECT_CALL(*decoder_filters_[1], decodeHeaders(_, true)) | ||
.WillOnce(Return(FilterHeadersStatus::Continue)); | ||
|
||
// Kick off the incoming data. | ||
Buffer::OwnedImpl fake_input("1234"); | ||
conn_manager_->onData(fake_input, true); | ||
|
||
EXPECT_CALL(*encoder_filters_[1], encodeHeaders(_, true)) | ||
.WillOnce(Return(FilterHeadersStatus::ContinueHeadersOnly)); | ||
EXPECT_CALL(*encoder_filters_[0], encodeHeaders(_, true)) | ||
.WillOnce(Return(FilterHeadersStatus::Continue)); | ||
EXPECT_CALL(response_encoder_, encodeHeaders(_, true)); | ||
|
||
expectOnDestroy(); | ||
|
||
decoder_filters_[1]->callbacks_->encodeHeaders( | ||
HeaderMapPtr{new TestHeaderMapImpl{{":status", "200"}}}, true); | ||
|
||
Buffer::OwnedImpl response_body("response"); | ||
decoder_filters_[1]->callbacks_->encodeData(response_body, true); | ||
} | ||
|
||
TEST_F(HttpConnectionManagerImplTest, FilterContinueHeadersOnlyData) { | ||
InSequence s; | ||
setup(false, ""); | ||
|
||
EXPECT_CALL(*codec_, dispatch(_)).WillOnce(Invoke([&](Buffer::Instance&) -> void { | ||
StreamDecoder* decoder = &conn_manager_->newStream(response_encoder_); | ||
HeaderMapPtr headers{ | ||
new TestHeaderMapImpl{{":authority", "host"}, {":path", "/"}, {":method", "GET"}}}; | ||
decoder->decodeHeaders(std::move(headers), false); | ||
|
||
Buffer::OwnedImpl fake_data("hello"); | ||
decoder->decodeData(fake_data, true); | ||
})); | ||
|
||
setupFilterChain(2, 2); | ||
|
||
EXPECT_CALL(*decoder_filters_[0], decodeHeaders(_, false)) | ||
.WillOnce(InvokeWithoutArgs( | ||
[&]() -> FilterHeadersStatus { return FilterHeadersStatus::ContinueHeadersOnly; })); | ||
EXPECT_CALL(*decoder_filters_[1], decodeHeaders(_, true)) | ||
.WillOnce(Return(FilterHeadersStatus::Continue)); | ||
|
||
// Kick off the incoming data. | ||
Buffer::OwnedImpl fake_input("1234"); | ||
conn_manager_->onData(fake_input, false); | ||
|
||
EXPECT_CALL(*encoder_filters_[1], encodeHeaders(_, false)) | ||
.WillOnce(Return(FilterHeadersStatus::ContinueHeadersOnly)); | ||
EXPECT_CALL(*encoder_filters_[0], encodeHeaders(_, true)) | ||
.WillOnce(Return(FilterHeadersStatus::Continue)); | ||
EXPECT_CALL(response_encoder_, encodeHeaders(_, true)); | ||
|
||
expectOnDestroy(); | ||
|
||
decoder_filters_[1]->callbacks_->encodeHeaders( | ||
HeaderMapPtr{new TestHeaderMapImpl{{":status", "200"}}}, false); | ||
|
||
Buffer::OwnedImpl response_body("response"); | ||
decoder_filters_[1]->callbacks_->encodeData(response_body, true); | ||
} | ||
|
||
TEST_F(HttpConnectionManagerImplTest, FilterContinueHeadersOnlyTrailers) { | ||
InSequence s; | ||
setup(false, ""); | ||
|
||
EXPECT_CALL(*codec_, dispatch(_)).WillOnce(Invoke([&](Buffer::Instance&) -> void { | ||
StreamDecoder* decoder = &conn_manager_->newStream(response_encoder_); | ||
HeaderMapPtr headers{ | ||
new TestHeaderMapImpl{{":authority", "host"}, {":path", "/"}, {":method", "GET"}}}; | ||
decoder->decodeHeaders(std::move(headers), false); | ||
|
||
Buffer::OwnedImpl fake_data("hello"); | ||
decoder->decodeData(fake_data, false); | ||
|
||
HeaderMapPtr trailers{new TestHeaderMapImpl{{"foo", "bar"}}}; | ||
decoder->decodeTrailers(std::move(trailers)); | ||
})); | ||
|
||
setupFilterChain(2, 2); | ||
|
||
EXPECT_CALL(*decoder_filters_[0], decodeHeaders(_, false)) | ||
.WillOnce(InvokeWithoutArgs( | ||
[&]() -> FilterHeadersStatus { return FilterHeadersStatus::ContinueHeadersOnly; })); | ||
EXPECT_CALL(*decoder_filters_[1], decodeHeaders(_, true)) | ||
.WillOnce(Return(FilterHeadersStatus::Continue)); | ||
|
||
// Kick off the incoming data. | ||
Buffer::OwnedImpl fake_input("1234"); | ||
conn_manager_->onData(fake_input, false); | ||
|
||
EXPECT_CALL(*encoder_filters_[1], encodeHeaders(_, false)) | ||
.WillOnce(Return(FilterHeadersStatus::ContinueHeadersOnly)); | ||
EXPECT_CALL(*encoder_filters_[0], encodeHeaders(_, true)) | ||
.WillOnce(Return(FilterHeadersStatus::Continue)); | ||
EXPECT_CALL(response_encoder_, encodeHeaders(_, true)); | ||
|
||
expectOnDestroy(); | ||
|
||
decoder_filters_[1]->callbacks_->encodeHeaders( | ||
HeaderMapPtr{new TestHeaderMapImpl{{":status", "200"}}}, false); | ||
|
||
Buffer::OwnedImpl response_body("response"); | ||
decoder_filters_[1]->callbacks_->encodeData(response_body, false); | ||
|
||
HeaderMapPtr response_trailers{new TestHeaderMapImpl{{"x-trailer", "1"}}}; | ||
decoder_filters_[1]->callbacks_->encodeTrailers(std::move(response_trailers)); | ||
} | ||
|
||
TEST_F(HttpConnectionManagerImplTest, FilterAddBodyContinuation) { | ||
InSequence s; | ||
setup(false, ""); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/and/any