Skip to content

Commit

Permalink
http2: deprecates use of CallbackVisitor in CodecImpl (envoyproxy#32378)
Browse files Browse the repository at this point in the history
This change removes an unnecessary layer of abstraction by implementing Http2VisitorInterface directly, rather than passing HTTP/2 codec events through the QUICHE CallbackVisitor and nghttp2-style callbacks.

When the reloadable feature is removed, CodecImpl can be significantly simplified.

$ bazel test //test/common/http/http2/... //test/integration/...
INFO: Invocation ID: 76f7aa9f-e507-4798-892a-6e58df6deb7e
INFO: Analyzed 397 targets (0 packages loaded, 0 targets configured).
INFO: Found 295 targets and 102 test targets...
INFO: Elapsed time: 745.010s, Critical Path: 744.00s
INFO: 399 processes: 11 remote cache hit, 389 remote.
INFO: Build completed successfully, 399 total actions

Executed 98 out of 102 tests: 102 tests pass.
Commit Message: http2: deprecates use of CallbackVisitor in CodecImpl
Additional Description:
Risk Level: medium, affects handling of HTTP/2 codec events
Testing: ran unit and integration tests both with and without the feature enabled.
Docs Changes:
Release Notes:
Platform Specific Features:
Runtime guard: envoy.reloadable_features.http2_skip_callback_visitor

Signed-off-by: Biren Roy <birenroy@google.com>
  • Loading branch information
birenroy authored Mar 11, 2024
1 parent 3449fcf commit bd0130e
Show file tree
Hide file tree
Showing 6 changed files with 351 additions and 36 deletions.
4 changes: 4 additions & 0 deletions changelogs/current.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,10 @@ minor_behavior_changes:
change: |
Add more conversion in the proxy status utility. It can be disabled by the runtime guard
``envoy.reloadable_features.proxy_status_mapping_more_core_response_flags``.
- area: http2
change: |
Simplifies integration with the codec by removing translation between nghttp2 callbacks and Http2VisitorInterface events.
Guarded by ``envoy.reloadable_features.http2_skip_callback_visitor``.
bug_fixes:
# *Changes expected to improve the state of the world and are unlikely to have negative effects*
Expand Down
284 changes: 255 additions & 29 deletions source/common/http/http2/codec_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@

#include "absl/cleanup/cleanup.h"
#include "absl/container/fixed_array.h"
#include "quiche/common/quiche_endian.h"
#include "quiche/http2/adapter/callback_visitor.h"
#include "quiche/http2/adapter/nghttp2_adapter.h"
#include "quiche/http2/adapter/oghttp2_adapter.h"
Expand Down Expand Up @@ -118,8 +119,13 @@ std::unique_ptr<http2::adapter::Http2Adapter>
ProdNghttp2SessionFactory::create(const nghttp2_session_callbacks* callbacks,
ConnectionImpl* connection,
const http2::adapter::OgHttp2Adapter::Options& options) {
auto visitor = std::make_unique<http2::adapter::CallbackVisitor>(
http2::adapter::Perspective::kClient, *callbacks, connection);
std::unique_ptr<http2::adapter::Http2VisitorInterface> visitor;
if (connection->skipCallbackVisitor()) {
visitor = std::make_unique<ConnectionImpl::Http2Visitor>(connection);
} else {
visitor = std::make_unique<http2::adapter::CallbackVisitor>(
http2::adapter::Perspective::kClient, *callbacks, connection);
}
std::unique_ptr<http2::adapter::Http2Adapter> adapter =
http2::adapter::OgHttp2Adapter::Create(*visitor, options);
connection->setVisitor(std::move(visitor));
Expand All @@ -129,15 +135,26 @@ ProdNghttp2SessionFactory::create(const nghttp2_session_callbacks* callbacks,
std::unique_ptr<http2::adapter::Http2Adapter>
ProdNghttp2SessionFactory::create(const nghttp2_session_callbacks* callbacks,
ConnectionImpl* connection, const nghttp2_option* options) {
auto visitor = std::make_unique<http2::adapter::CallbackVisitor>(
http2::adapter::Perspective::kClient, *callbacks, connection);
auto adapter = http2::adapter::NgHttp2Adapter::CreateClientAdapter(*visitor, options);
auto stream_close_listener = [p = adapter.get()](http2::adapter::Http2StreamId stream_id) {
p->RemoveStream(stream_id);
};
visitor->set_stream_close_listener(std::move(stream_close_listener));
connection->setVisitor(std::move(visitor));
return adapter;
if (connection->skipCallbackVisitor()) {
auto visitor = std::make_unique<ConnectionImpl::Http2Visitor>(connection);
auto adapter = http2::adapter::NgHttp2Adapter::CreateClientAdapter(*visitor, options);
auto stream_close_listener = [p = adapter.get()](http2::adapter::Http2StreamId stream_id) {
p->RemoveStream(stream_id);
};
visitor->setStreamCloseListener(std::move(stream_close_listener));
connection->setVisitor(std::move(visitor));
return adapter;
} else {
auto visitor = std::make_unique<http2::adapter::CallbackVisitor>(
http2::adapter::Perspective::kClient, *callbacks, connection);
auto adapter = http2::adapter::NgHttp2Adapter::CreateClientAdapter(*visitor, options);
auto stream_close_listener = [p = adapter.get()](http2::adapter::Http2StreamId stream_id) {
p->RemoveStream(stream_id);
};
visitor->set_stream_close_listener(std::move(stream_close_listener));
connection->setVisitor(std::move(visitor));
return adapter;
}
}

void ProdNghttp2SessionFactory::init(ConnectionImpl* connection,
Expand Down Expand Up @@ -1755,7 +1772,8 @@ ConnectionImpl::Http2Callbacks::Http2Callbacks() {

nghttp2_session_callbacks_set_on_begin_headers_callback(
callbacks_, [](nghttp2_session*, const nghttp2_frame* frame, void* user_data) -> int {
auto status = static_cast<ConnectionImpl*>(user_data)->onBeginHeaders(frame->hd.stream_id);
Status status =
static_cast<ConnectionImpl*>(user_data)->onBeginHeaders(frame->hd.stream_id);
return static_cast<ConnectionImpl*>(user_data)->setAndCheckCodecCallbackStatus(
std::move(status));
});
Expand All @@ -1782,23 +1800,25 @@ ConnectionImpl::Http2Callbacks::Http2Callbacks() {

nghttp2_session_callbacks_set_on_begin_frame_callback(
callbacks_, [](nghttp2_session*, const nghttp2_frame_hd* hd, void* user_data) -> int {
auto status = static_cast<ConnectionImpl*>(user_data)->onBeforeFrameReceived(
Status status = static_cast<ConnectionImpl*>(user_data)->onBeforeFrameReceived(
hd->stream_id, hd->length, hd->type, hd->flags);
return static_cast<ConnectionImpl*>(user_data)->setAndCheckCodecCallbackStatus(
std::move(status));
});

nghttp2_session_callbacks_set_on_frame_recv_callback(
callbacks_, [](nghttp2_session*, const nghttp2_frame* frame, void* user_data) -> int {
auto status = static_cast<ConnectionImpl*>(user_data)->onFrameReceived(frame);
return static_cast<ConnectionImpl*>(user_data)->setAndCheckCodecCallbackStatus(
std::move(status));
auto* conn = static_cast<ConnectionImpl*>(user_data);
RELEASE_ASSERT(!conn->skipCallbackVisitor(), "Unexpected use of nghttp2 callback!");
Status status = conn->onFrameReceived(frame);
return conn->setAndCheckCodecCallbackStatus(std::move(status));
});

nghttp2_session_callbacks_set_on_stream_close_callback(
callbacks_,
[](nghttp2_session*, int32_t stream_id, uint32_t error_code, void* user_data) -> int {
auto status = static_cast<ConnectionImpl*>(user_data)->onStreamClose(stream_id, error_code);
Status status =
static_cast<ConnectionImpl*>(user_data)->onStreamClose(stream_id, error_code);
return static_cast<ConnectionImpl*>(user_data)->setAndCheckCodecCallbackStatus(
std::move(status));
});
Expand All @@ -1814,8 +1834,10 @@ ConnectionImpl::Http2Callbacks::Http2Callbacks() {
error_code = frame->rst_stream.error_code;
break;
}
return static_cast<ConnectionImpl*>(user_data)->onFrameSend(
frame->hd.stream_id, frame->hd.length, frame->hd.type, frame->hd.flags, error_code);
auto* conn = static_cast<ConnectionImpl*>(user_data);
RELEASE_ASSERT(!conn->skipCallbackVisitor(), "Unexpected use of nghttp2 callback!");
return conn->onFrameSend(frame->hd.stream_id, frame->hd.length, frame->hd.type,
frame->hd.flags, error_code);
});

nghttp2_session_callbacks_set_before_frame_send_callback(
Expand Down Expand Up @@ -1861,6 +1883,193 @@ ConnectionImpl::Http2Callbacks::Http2Callbacks() {

ConnectionImpl::Http2Callbacks::~Http2Callbacks() { nghttp2_session_callbacks_del(callbacks_); }

ConnectionImpl::Http2Visitor::Http2Visitor(ConnectionImpl* connection) : connection_(connection) {}

int64_t ConnectionImpl::Http2Visitor::OnReadyToSend(absl::string_view serialized) {
RELEASE_ASSERT(connection_->skipCallbackVisitor(), "Unexpected use of Http2Visitor!");
return connection_->onSend(reinterpret_cast<const uint8_t*>(serialized.data()),
serialized.size());
}

bool ConnectionImpl::Http2Visitor::OnFrameHeader(Http2StreamId stream_id, size_t length,
uint8_t type, uint8_t flags) {
RELEASE_ASSERT(connection_->skipCallbackVisitor(), "Unexpected use of Http2Visitor!");
ENVOY_CONN_LOG(debug, "Http2Visitor::OnFrameHeader({}, {}, {}, {})", connection_->connection_,
stream_id, length, int(type), int(flags));

if (type == NGHTTP2_CONTINUATION) {
if (current_frame_.stream_id != stream_id) {
return false;
}
current_frame_.length += length;
current_frame_.flags |= flags;
} else {
current_frame_ = {stream_id, length, type, flags};
padding_length_ = 0;
remaining_data_payload_ = 0;
}
Status status = connection_->onBeforeFrameReceived(stream_id, length, type, flags);
return 0 == connection_->setAndCheckCodecCallbackStatus(std::move(status));
}

bool ConnectionImpl::Http2Visitor::OnBeginHeadersForStream(Http2StreamId stream_id) {
Status status = connection_->onBeginHeaders(stream_id);
return 0 == connection_->setAndCheckCodecCallbackStatus(std::move(status));
}

http2::adapter::Http2VisitorInterface::OnHeaderResult
ConnectionImpl::Http2Visitor::OnHeaderForStream(Http2StreamId stream_id,
absl::string_view name_view,
absl::string_view value_view) {
// TODO PERF: Can reference count here to avoid copies.
HeaderString name;
name.setCopy(name_view.data(), name_view.size());
HeaderString value;
value.setCopy(value_view.data(), value_view.size());
const int result = connection_->onHeader(stream_id, std::move(name), std::move(value));
switch (result) {
case 0:
return HEADER_OK;
case NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE:
return HEADER_RST_STREAM;
default:
return HEADER_CONNECTION_ERROR;
}
}

bool ConnectionImpl::Http2Visitor::OnEndHeadersForStream(Http2StreamId stream_id) {
ENVOY_CONN_LOG(debug, "Http2Visitor::OnEndHeadersForStream({})", connection_->connection_,
stream_id);
Status status = connection_->onHeaders(stream_id, current_frame_.length, current_frame_.flags);
return 0 == connection_->setAndCheckCodecCallbackStatus(std::move(status));
}

bool ConnectionImpl::Http2Visitor::OnBeginDataForStream(Http2StreamId stream_id,
size_t payload_length) {
ENVOY_CONN_LOG(debug, "Http2Visitor::OnBeginDataForStream({}, {})", connection_->connection_,
stream_id, payload_length);
remaining_data_payload_ = payload_length;
padding_length_ = 0;
if (remaining_data_payload_ == 0 && (current_frame_.flags & NGHTTP2_FLAG_END_STREAM) == 0) {
ENVOY_CONN_LOG(debug, "Http2Visitor dispatching DATA for stream {}", connection_->connection_,
stream_id);
Status status = connection_->onBeginData(stream_id, current_frame_.length, current_frame_.flags,
padding_length_);
return 0 == connection_->setAndCheckCodecCallbackStatus(std::move(status));
}
ENVOY_CONN_LOG(debug, "Http2Visitor: remaining data payload: {}, end_stream: {}",
connection_->connection_, remaining_data_payload_,
bool(current_frame_.flags & NGHTTP2_FLAG_END_STREAM));
return true;
}

bool ConnectionImpl::Http2Visitor::OnDataPaddingLength(Http2StreamId stream_id,
size_t padding_length) {
padding_length_ = padding_length;
remaining_data_payload_ -= padding_length;
if (remaining_data_payload_ == 0 && (current_frame_.flags & NGHTTP2_FLAG_END_STREAM) == 0) {
ENVOY_CONN_LOG(debug, "Http2Visitor dispatching DATA for stream {}", connection_->connection_,
stream_id);
Status status = connection_->onBeginData(stream_id, current_frame_.length, current_frame_.flags,
padding_length_);
return 0 == connection_->setAndCheckCodecCallbackStatus(std::move(status));
}
ENVOY_CONN_LOG(debug, "Http2Visitor: remaining data payload: {}, end_stream: {}",
connection_->connection_, remaining_data_payload_,
bool(current_frame_.flags & NGHTTP2_FLAG_END_STREAM));
return true;
}

bool ConnectionImpl::Http2Visitor::OnDataForStream(Http2StreamId stream_id,
absl::string_view data) {
const int result =
connection_->onData(stream_id, reinterpret_cast<const uint8_t*>(data.data()), data.size());
remaining_data_payload_ -= data.size();
if (result == 0 && remaining_data_payload_ == 0 &&
(current_frame_.flags & NGHTTP2_FLAG_END_STREAM) == 0) {
ENVOY_CONN_LOG(debug, "Http2Visitor dispatching DATA for stream {}", connection_->connection_,
stream_id);
Status status = connection_->onBeginData(stream_id, current_frame_.length, current_frame_.flags,
padding_length_);
return 0 == connection_->setAndCheckCodecCallbackStatus(std::move(status));
}
ENVOY_CONN_LOG(debug, "Http2Visitor: remaining data payload: {}, end_stream: {}",
connection_->connection_, remaining_data_payload_,
bool(current_frame_.flags & NGHTTP2_FLAG_END_STREAM));
return result == 0;
}

bool ConnectionImpl::Http2Visitor::OnEndStream(Http2StreamId stream_id) {
ENVOY_CONN_LOG(debug, "Http2Visitor::OnEndStream({})", connection_->connection_, stream_id);
if (current_frame_.type == NGHTTP2_DATA) {
// `onBeginData` is invoked here to ensure that the connection has successfully validated and
// processed the entire DATA frame.
ENVOY_CONN_LOG(debug, "Http2Visitor dispatching DATA for stream {}", connection_->connection_,
stream_id);
Status status = connection_->onBeginData(stream_id, current_frame_.length, current_frame_.flags,
padding_length_);
return 0 == connection_->setAndCheckCodecCallbackStatus(std::move(status));
}
return true;
}

void ConnectionImpl::Http2Visitor::OnRstStream(Http2StreamId stream_id, Http2ErrorCode error_code) {
(void)connection_->onRstStream(stream_id, static_cast<uint32_t>(error_code));
}

bool ConnectionImpl::Http2Visitor::OnCloseStream(Http2StreamId stream_id,
Http2ErrorCode error_code) {
Status status = connection_->onStreamClose(stream_id, static_cast<uint32_t>(error_code));
if (stream_close_listener_) {
ENVOY_CONN_LOG(debug, "Http2Visitor invoking stream close listener for {}",
connection_->connection_, stream_id);
stream_close_listener_(stream_id);
}
return 0 == connection_->setAndCheckCodecCallbackStatus(std::move(status));
}

void ConnectionImpl::Http2Visitor::OnPing(Http2PingId ping_id, bool is_ack) {
const uint64_t network_order_opaque_data = quiche::QuicheEndian::HostToNet64(ping_id);
Status status = connection_->onPing(network_order_opaque_data, is_ack);
connection_->setAndCheckCodecCallbackStatus(std::move(status));
}

bool ConnectionImpl::Http2Visitor::OnGoAway(Http2StreamId /*last_accepted_stream_id*/,
Http2ErrorCode error_code,
absl::string_view /*opaque_data*/) {
Status status = connection_->onGoAway(static_cast<uint32_t>(error_code));
return 0 == connection_->setAndCheckCodecCallbackStatus(std::move(status));
}

int ConnectionImpl::Http2Visitor::OnBeforeFrameSent(uint8_t frame_type, Http2StreamId stream_id,
size_t length, uint8_t flags) {
return connection_->onBeforeFrameSend(stream_id, length, frame_type, flags);
}

int ConnectionImpl::Http2Visitor::OnFrameSent(uint8_t frame_type, Http2StreamId stream_id,
size_t length, uint8_t flags, uint32_t error_code) {
return connection_->onFrameSend(stream_id, length, frame_type, flags, error_code);
}

bool ConnectionImpl::Http2Visitor::OnInvalidFrame(Http2StreamId stream_id,
InvalidFrameError error) {
return 0 == connection_->onInvalidFrame(stream_id, http2::adapter::ToNgHttp2ErrorCode(error));
}

bool ConnectionImpl::Http2Visitor::OnMetadataForStream(Http2StreamId stream_id,
absl::string_view metadata) {
return 0 == connection_->onMetadataReceived(
stream_id, reinterpret_cast<const uint8_t*>(metadata.data()), metadata.size());
}

bool ConnectionImpl::Http2Visitor::OnMetadataEndForStream(Http2StreamId stream_id) {
return 0 == connection_->onMetadataFrameComplete(stream_id, true);
}

void ConnectionImpl::Http2Visitor::OnErrorDebug(absl::string_view message) {
connection_->onError(message);
}

ConnectionImpl::Http2Options::Http2Options(
const envoy::config::core::v3::Http2ProtocolOptions& http2_options, uint32_t max_headers_kb) {
og_options_.perspective = http2::adapter::Perspective::kServer;
Expand Down Expand Up @@ -2127,20 +2336,37 @@ ServerConnectionImpl::ServerConnectionImpl(
"found. Is it configured?");
Http2Options h2_options(http2_options, max_request_headers_kb);

auto visitor = std::make_unique<http2::adapter::CallbackVisitor>(
auto callback_visitor = std::make_unique<http2::adapter::CallbackVisitor>(
http2::adapter::Perspective::kServer, *http2_callbacks_.callbacks(), base());
auto direct_visitor = std::make_unique<Http2Visitor>(this);

if (use_oghttp2_library_) {
visitor_ = std::move(visitor);
if (skipCallbackVisitor()) {
visitor_ = std::move(direct_visitor);
} else {
visitor_ = std::move(callback_visitor);
}
adapter_ = http2::adapter::OgHttp2Adapter::Create(*visitor_, h2_options.ogOptions());
} else {
auto adapter =
http2::adapter::NgHttp2Adapter::CreateServerAdapter(*visitor, h2_options.options());
auto stream_close_listener = [p = adapter.get()](http2::adapter::Http2StreamId stream_id) {
p->RemoveStream(stream_id);
};
visitor->set_stream_close_listener(std::move(stream_close_listener));
visitor_ = std::move(visitor);
adapter_ = std::move(adapter);
if (skipCallbackVisitor()) {
auto adapter = http2::adapter::NgHttp2Adapter::CreateServerAdapter(*direct_visitor,
h2_options.options());
auto stream_close_listener = [p = adapter.get()](http2::adapter::Http2StreamId stream_id) {
p->RemoveStream(stream_id);
};
direct_visitor->setStreamCloseListener(std::move(stream_close_listener));
visitor_ = std::move(direct_visitor);
adapter_ = std::move(adapter);
} else {
auto adapter = http2::adapter::NgHttp2Adapter::CreateServerAdapter(*callback_visitor,
h2_options.options());
auto stream_close_listener = [p = adapter.get()](http2::adapter::Http2StreamId stream_id) {
p->RemoveStream(stream_id);
};
callback_visitor->set_stream_close_listener(std::move(stream_close_listener));
visitor_ = std::move(callback_visitor);
adapter_ = std::move(adapter);
}
}
sendSettings(http2_options, false);
allow_metadata_ = http2_options.allow_metadata();
Expand Down
Loading

0 comments on commit bd0130e

Please sign in to comment.